Source code for message_ix_models.model.transport.workflow

import logging
from copy import deepcopy
from typing import TYPE_CHECKING, Literal

from message_ix_models.model.workflow import Config as WorkflowConfig
from message_ix_models.util import minimum_version

if TYPE_CHECKING:
    from sdmx.model.common import Code

    from message_ix_models.util.context import Context
    from message_ix_models.workflow import Workflow

    from .config import Config

log = logging.getLogger(__name__)


#: Default :class:`.workflow.Config` for solving MESSAGEix-Transport.
#:
#: - :py:`lpmethod=4, scaind=1` to overcome LP status 5 (optimal with unscaled
#:   infeasibilities) when running on SSP(2024) base scenarios in some situations.
#: - :py:`iis=1` to display verbose conflict information on infeasibility.
#: - :py:`threads=8` for performance on UniCC.
#: - :py:`tilim=45 * 60` to limit runtime to 45 minutes on IIASA-hosted GitHub Actions
#:   runners.
SOLVE_CONFIG = WorkflowConfig(
    reserve_margin=False,
    solve=dict(
        model="MESSAGE",
        solve_options=dict(
            iis=1,
            lpmethod=6,
            scaind=0,
            threads=8,
            tilim=45 * 60,
        ),
    ),
)


[docs] def add_steps(wf: "Workflow", base: str, scenario_code: "Code") -> str: """Add 0 or more MESSAGEix-Transport workflow steps to `wf`. If `scenario_code` does not contain annotations necessary to configure MESSAGEix-Transport, `base` is returned and no changes are made to `wf`. Otherwise, the following steps are added: 1. :func:`.transport.build.main`. This step clones the Scenario from `base` to a URL given by :meth:`.Config.get_target_url`. 2. If :attr:`.Config.policy` is truth-y (that is, contains any :class:`.Policy` instances), call :func:`.model.workflow.step_0` and remove values for the "bound_emission" parameter. Otherwise, no action. 3. :func:`message_ix.tools.migrate.initial_new_capacity_up_v311`. The name of (3) is returned. As well, the following additional steps are added: - "[…] debug build". This is the same as (1), except giving :py:`dry_run=True`, so the scenario is not modified; only debug output is generated. See :func:`.transport.build.main`. - "T debug multi": Collects all "[…] debug build" steps. Repeated calls to :func:`.add_steps` extend the list. - "T report multi": :func:`.transport.report.multi`, called on all the target URLs for (1). Repeated calls to :func:`.add_steps` extend the list. """ from message_ix.tools.migrate import initial_new_capacity_up_v311 from message_ix_models.model.workflow import step_0 from . import build from .config import Config from .report import callback, multi # Retrieve the Context used for the Workflow context: "Context" = wf.graph["context"] try: # Retrieve .transport.Config and make a copy for this particular workflow branch config = deepcopy(context.transport) except AttributeError: # Not yet configured → use defaults config = Config.from_context(context) try: # Update the .transport.Config from the `scenario_code` config.code = scenario_code except KeyError: # `scenario_code` does not contain annotations for .model.transport; do nothing return base # Use transport reporting if callback not in context.report.callback: context.report.register(callback) # Short label for workflow step names label = f"{config.code.id} T" # Identify the target of the build step target_url = config.get_target_url(context) # Build MESSAGEix-Transport on the scenario name = wf.add_step( f"{label} built", base, build.main, target=target_url, clone=True, config=config ) if config.policy: # Prepare emissions accounting for carbon pricing kw = dict(remove_emission_parameters=("bound_emission",)) name = wf.add_step(f"{label} step_0", name, step_0, **kw) # Adjust initial_new_capacity_up values for message_ix#924 name = wf.add_step( f"{label} incu adjusted", name, lambda _, s: initial_new_capacity_up_v311(s, safety_factor=1.05), ) # 'Simulate' build and produce debug outputs debug = f"{label} debug build" wf.add_step(debug, base, build.main, config=config, dry_run=True) # Compare debug outputs from multiple simulated builds if "T debug build" not in wf: # NB Here we use genno.Computer.add(), not .Workflow.add_step(). This is because # because the operations are not WorkflowSteps that receive, modify, and # return Scenario objects—only ordinary Python functions. wf.add("T debug build", build.debug_multi, "context") # Append the step ID for the debug step wf.graph["T debug build"] += (debug,) # Report (including plot) using data from multiple, solved scenarios if "T report multi" not in wf: wf.add("T report multi", multi, "context", "T targets") wf.add("T targets", []) # Append this target URL to the list of target URLs wf.graph["T targets"].append(target_url) return name
[docs] def base_scenario_url( context: "Context", config: "Config", method: Literal["auto", "bare"] = "bare" ) -> str: """Identify the base MESSAGEix-GLOBIOM scenario. If :attr:`.scenario_info` is set on `context` (for instance, provided via the :program:`--url` CLI option), nothing is done, and the URL corresponding to that scenario is returned. If not, then the behaviour depends on `method`: :py:`method = "auto"` Automatically identify the base scenario URL from the contents of ``CL_TRANSPORT_SCENARIO``. The settings :attr:`.Config.ssp <.transport.config.Config.ssp>` and :attr:`.Config.policy` are used to match an entry in the file. :py:`method = "bare"` Construct bare RES scenario using :mod:`.model.bare.create_res` and the settings on `context` such as :attr:`.Config.regions`. Return the URL to this scenario. """ if context.scenario_info: assert context.core.url return context.core.url if method == "auto": return config.base_scenario_url elif method == "bare": # Use a 'bare' RES or empty scenario if context.platform_info["name"] in (__name__, "message-ix-models"): # Temporary platform or testing → use the bare RES from message_ix_models.model import bare log.info("No --model/--scenario/--url; use the bare RES as base") # Build a bare RES scenario given .model.Config settings s = bare.create_res(context) return f"ixmp://{context.platform_info['name']}/{s.url}" else: log.warning("No --model/--scenario/--url; some workflow steps may not work") return f"ixmp://{context.platform_info.get('name', 'NONE')}/NONE/NONE"
[docs] def maybe_use_temporary_platform(context: "Context") -> None: """Set up a temporary, in-memory platform. .. todo:: Move upstream, to :mod:`message_ix_models`. """ if context.platform_info: return from ixmp import config as ixmp_config ixmp_config.add_platform( __name__, "jdbc", "hsqldb", url=f"jdbc:hsqldb:mem:{__name__}" ) context.platform_info.update(name=__name__) log.info("No --platform/--url; using temporary, in-memory database")
[docs] @minimum_version("message_ix 3.11") def generate( context: "Context", *, report_key: str = "transport all", **options ) -> "Workflow": """Generate the MESSAGEix-Transport :class:`.Workflow`.""" from message_ix_models.model.workflow import from_codelist from .config import CL_SCENARIO, Config # Handle CLI options # TODO respect quiet options.pop("target_model_name", None) # Stored on context.core.dest_scenario options.pop("target_scenario_name", None) # Stored on context.core.dest_scenario base_scenario_method = options.pop("base_scenario") del base_scenario_method maybe_use_temporary_platform(context) # Prepare base/common transport configuration, passing the remaining `options` Config.from_context(context, options=options) # Set the default .report.Config key for ".* reported" steps context.report.key = report_key # Set options for solving context.solve = SOLVE_CONFIG return from_codelist(context, CL_SCENARIO)