import json
import logging
from hashlib import blake2s
from itertools import product
from typing import TYPE_CHECKING, Literal, Optional
from genno import KeyExistsError
from message_ix_models.model.workflow import Config as WorkflowConfig
from message_ix_models.util import package_data_path
if TYPE_CHECKING:
from message_ix import Scenario
from message_ix_models.util.context import Context
from message_ix_models.workflow import Workflow
from .config import Config
log = logging.getLogger(__name__)
# Use lpmethod=4, scaind=1 to overcome LP status 5 (optimal with unscaled
# infeasibilities) when running on SSP(2024) base scenarios
SOLVE_CONFIG = WorkflowConfig(
reserve_margin=False,
solve=dict(
model="MESSAGE",
solve_options=dict(
iis=1,
lpmethod=4,
scaind=1,
),
),
)
[docs]
def base_scenario_url(
context: "Context", method: Literal["auto", "bare"] = "bare"
) -> str:
"""Identify the base MESSAGEix-GLOBIOM scenario.
If :attr:`.scenario_info` is set on `context` (for instance, provided via the
:program:`--url` CLI option), nothing is done, and the URL corresponding to that
scenario is returned.
If not, then the behaviour depends on `method`:
:py:`method = "auto"`
Automatically identify the base scenario URL from the contents of
:file:`base-scenario-url.json`. The settings :attr:`.Config.ssp
<.transport.config.Config.ssp>` and :attr:`.Config.policy` are used to match an
entry in the file.
:py:`method = "bare"`
Construct bare RES scenario using :mod:`.model.bare.create_res` and the settings
on `context` such as :attr:`.Config.regions`. Return the URL to this scenario.
"""
if context.scenario_info:
return context.core.url
config: "Config" = context.transport
if method == "auto":
# Load URL info from file
with open(package_data_path("transport", "base-scenario-url.json")) as f:
info = json.load(f)
# Identify a key that matches the settings on `config`
key = (str(config.ssp), config.policy)
for item in info:
if (item["ssp"], item["policy"]) == key:
return item["url"]
raise ValueError(f"No base URL for ({key!r})") # pragma: no cover
elif method == "bare":
# Use a 'bare' RES or empty scenario
if context.platform_info["name"] in (__name__, "message-ix-models"):
# Temporary platform or testing → use the bare RES
from message_ix_models.model import bare
log.info("No --model/--scenario/--url; use the bare RES as base")
# Build a bare RES scenario given .model.Config settings
s = bare.create_res(context)
return f"ixmp://{context.platform_info['name']}/{s.url}"
else:
log.warning("No --model/--scenario/--url; some workflow steps may not work")
return f"ixmp://{context.platform_info.get('name', 'NONE')}/NONE/NONE"
[docs]
def scenario_url(context: "Context", label: Optional[str] = None) -> str:
"""Construct a target URL for a built MESSAGEix-Transport scenario.
If the :attr:`.dest` URL is set on `context` (for instance, provided via the
:program:`--dest` CLI option), this URL returned with `label` appended to the
scenario name.
If not, a form is used like:
- :py:`model = "MESSAGEix-GLOBIOM 1.1-T-{regions}"`. Any value of the "model" key
from :attr:`.core.Config.dest_scenario` is appended.
- :py:`scenario = "{label}"`. Any value of the "scenario" key from
:attr:`.core.Config.dest_scenario` is appended; if this is not set, then either
"policy" (if :attr:`.transport.Config.policy` is set) or "baseline".
"""
# Construct a URL template for MESSAGEix-Transport scenarios
if context.core.dest:
# Value from --dest CLI option
# TODO Check that this works if a version # is specified
return f"{context.dest} {label or ''}".strip()
else:
# Values from --model-extra, --scenario-extra CLI options
m_extra = context.core.dest_scenario.get("model", "")
s_extra = context.core.dest_scenario.get("scenario") or (
"policy" if context.transport.policy else "baseline"
)
return "/".join(
(
f"MESSAGEix-GLOBIOM 1.1-T-{context.model.regions} {m_extra}".rstrip(),
f"{label or ''} {s_extra}".strip(),
)
)
[docs]
def short_hash(value: str) -> str:
"""Return a short (length 3) hash of `value`."""
return blake2s(value.encode()).hexdigest()[:3]
[docs]
def tax_emission(context: "Context", scenario: "Scenario", price: float) -> "Scenario":
"""Add emission tax.
This function calls code from :mod:`message_data.projects.navigate.workflow`,
:mod:`message_data.tools.utilities`, and other non-public locations. It cannot be
used without access to those codes.
"""
from message_ix import make_df
from message_ix_models.util import broadcast
try:
from message_data.projects.engage import workflow as engage_workflow
from message_data.projects.navigate import workflow as navigate_workflow
except ImportError:
raise RuntimeError("Requires non-public code from message_data")
# Add ENGAGE-style emissions accounting
scenario = engage_workflow.step_0(context, scenario)
# Add values for the MACRO 'drate' parameter.
# message_data.tools.utilities.add_tax_emission() refers to this parameter, rather
# than the MESSAGE 'interestrate' parameter, to compute nominal future values of the
# tax. The parameter is not present if MACRO has not been set up on the scenario.
name = "drate"
df = make_df(name, value=0.05, unit="-").pipe(broadcast, node=scenario.set("node"))
with scenario.transact(f"Add values for {name}"):
scenario.add_par(name, df)
return navigate_workflow.tax_emission(context, scenario, price)
[docs]
def generate(
context: "Context",
*,
report_key: str = "transport all",
dry_run: bool = False,
**options,
) -> "Workflow":
from message_ix_models import Workflow
from message_ix_models.model.workflow import solve
from message_ix_models.project.ssp import SSP_2024
from message_ix_models.report import report
from . import build
from .config import Config, get_cl_scenario
from .report import multi
# Handle CLI options
# TODO respect quiet
options.pop("target_model_name", None) # Stored on context.core.dest_scenario
options.pop("target_scenario_name", None) # Stored on context.core.dest_scenario
base_scenario_method = options.pop("base_scenario")
maybe_use_temporary_platform(context)
# Prepare transport configuration, passing the remaining `options`
Config.from_context(context, options=options)
# Set the default .report.Config key for ".* reported" steps
context.report.key = report_key
context.report.register("model.transport")
# Create the workflow
wf = Workflow(context)
# Collections of step names
debug, reported, targets = [], [], []
# Iterate over all (ssp, policy) combinations
cl_scenario = get_cl_scenario()
for scenario_code, policy in product(cl_scenario, (False, True)):
# Retrieve information from annotations on `scenario_code`
ssp_urn = str(scenario_code.get_annotation(id="SSP-URN").text)
is_LED = scenario_code.eval_annotation(id="is-LED-scenario")
EDITS_activity = scenario_code.eval_annotation(id="EDITS-activity-id")
# Look up the SSP_2024 code
ssp = SSP_2024.by_urn(ssp_urn)
# Store settings on the context
context.transport.ssp = ssp
context.transport.policy = policy
context.transport.project["LED"] = is_LED
# Construct labels including the SSP code and policy identifier
# ‘Short’ label used for workflow steps
label = f"{scenario_code.id}{' policy' if policy else ''}"
# ‘Full’ label used in the scenario name
if not is_LED and EDITS_activity is None:
label_full = f"SSP_2024.{ssp.name}"
else:
label_full = label
if policy and (is_LED or EDITS_activity is not None): # TEMPORARY
log.info(f"({label_full}, {policy=}) → skip")
continue
# Identify the base scenario
base_url = base_scenario_url(context, base_scenario_method)
log.info(f"({label_full}, {policy=}) → {base_url=}")
# Name of the base step
base = f"base {short_hash(base_url)}"
try:
# Load the base model scenario
wf.add_step(base, None, target=base_url)
except KeyExistsError:
# Base scenario URL is identical to another (ssp, policy) combination; use
# the scenario returned by that step
pass
# Identify the target of the build step
target_url = scenario_url(context, label_full)
targets.append(target_url)
# Build MESSAGEix-Transport on the scenario
name = wf.add_step(
f"{label} built", base, build.main, target=target_url, clone=True, ssp=ssp
)
# This block copied from message_data.projects.navigate.workflow
if policy:
# Add a carbon tax
name = wf.add_step(f"{label} with tax", name, tax_emission, price=1000.0)
# 'Simulate' build and produce debug outputs
debug.append(f"{label} debug build")
wf.add_step(debug[-1], base, build.main, ssp=ssp, dry_run=True)
# Solve
wf.add_step(f"{label} solved", name, solve, config=SOLVE_CONFIG)
# Report
reported.append(f"{label} reported")
wf.add_step(reported[-1], f"{label} solved", report)
# NB the following use genno.Computer.add(), not .Workflow.add_step(). This is
# because the operations are not WorkflowSteps that receive, modify, and return
# Scenario objects—only ordinary Python functions.
# Compare debug outputs from multiple simulated builds
wf.add("debug build", build.debug_multi, "context", *debug)
# Report (including plot) using data from multiple, solved scenarios
wf.add("report multi", multi, "context", targets=targets)
# Report all the scenarios
wf.add("all reported", reported)
wf.default_key = "all reported"
return wf