"""Utilities for testing :mod:`~message_ix_models.model.transport`."""
import logging
import platform
from collections.abc import Callable, Hashable, Mapping
from contextlib import nullcontext
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union
import pytest
from message_ix import ModelError, Reporter, Scenario
import message_ix_models.report
from message_ix_models import Context, ScenarioInfo
from message_ix_models.report.sim import add_simulated_solution
from message_ix_models.testing import GHA, bare_res
from message_ix_models.util import identify_nodes, silence_log
from message_ix_models.util.graphviz import HAS_GRAPHVIZ
from . import Config, build
import pandas
import pint
from genno import Computer
log = logging.getLogger(__name__)
# Common marks for transport code. Do not reuse keys that are less than the highest key
# appearing in the dict.
MARK: dict[Hashable, pytest.MarkDecorator] = {
0: pytest.mark.xfail(
reason="Missing R14 input data/config", raises=FileNotFoundError
1: pytest.mark.skip(
reason="Currently only possible with regions=R12 input data/config",
3: pytest.mark.xfail(raises=ValueError, reason="Missing ISR/mer-to-ppp.csv"),
4: pytest.mark.xfail(reason="Currently unsupported"),
# Tests that fail with data that cannot be migrated from message_data
7: pytest.mark.xfail(
condition=GHA and platform.system() == "Darwin" and not HAS_GRAPHVIZ,
reason="Graphviz missing on macos-13 GitHub Actions runners",
"gh-281": pytest.mark.xfail(
reason="Temporary, for https://github.com/iiasa/message-ix-models/pull/281",
9: pytest.mark.xfail(reason="Missing R14 input data/config"),
"gh-288": pytest.mark.xfail(
reason="Temporary, for https://github.com/iiasa/message-ix-models/pull/288",
make_mark: dict[int, Callable[..., pytest.MarkDecorator]] = {
2: lambda t: pytest.mark.xfail(
reason="Missing input data/assumptions for this node codelist", raises=t
5: lambda f: pytest.mark.xfail(
raises=FileNotFoundError, reason=f"Requires non-public data ({f})"
def assert_units(
df: "pandas.DataFrame", expected: Union[str, dict, "pint.Unit", "pint.Quantity"]
"""Assert that `df` has the unique, `expected` units."""
import pint
from iam_units import registry
all_units = df["unit"].unique()
assert 1 == len(all_units), f"Non-unique {all_units = }"
# Convert the unique value to the same class as `expected`
if isinstance(expected, pint.Quantity):
assert expected == expected.__class__(1.0, all_units[0])
elif isinstance(expected, Mapping):
# Compare dimensionality of the units, rather than exact match
assert expected == registry.Quantity(all_units[0] or "0").dimensionality
assert expected == expected.__class__(all_units[0])
def built_transport(
context: Context,
options: Optional[dict] = None,
solved: bool = False,
quiet: bool = True,
) -> Scenario:
"""Analogous to :func:`.testing.bare_res`, with transport detail added."""
options = options or dict()
# Retrieve (maybe generate) the bare RES with the same settings
res = bare_res(request, context, solved)
# Derive the name for the transport scenario
model_name = res.model.replace("-GLOBIOM", "-Transport")
scenario = Scenario(res.platform, model_name, "baseline")
except ValueError:
log.info(f"Create '{model_name}/baseline' for testing")
# Optionally silence logs for code used via build.main()
log_cm = (
silence_log("genno message_ix_models.model.transport message_ix_models")
if quiet
else nullcontext()
with log_cm:
scenario = res.clone(model=model_name)
build.main(context, scenario, options, fast=True)
# Loaded existing Scenario; ensure config files are loaded on `context`
Config.from_context(context, options=options)
if solved and not scenario.has_solution():
log.info(f"Solve '{scenario.model}/{scenario.scenario}'")
log.info(f"Clone to '{model_name}/{request.node.name}'")
result = scenario.clone(scenario=request.node.name, keep_solution=solved)
if (
and platform.system() == "Darwin"
and identify_nodes(result) != context.model.regions
reason="Known issue on GitHub Actions macOS runners: result has nodes "
f"{identify_nodes(result) = !r} != {identify_nodes(res) = !r} == "
f"{context.model.regions = !r}"
return result
def simulated_solution(request, context) -> Reporter:
"""Return a :class:`.Reporter` with a simulated model solution.
The contents allow for fast testing of reporting code, without solving an actual
from .report import callback
# Build the base model
scenario = built_transport(request, context, solved=False)
# Info about the built model
info = ScenarioInfo(scenario)
config: "Config" = context.transport
technologies = config.spec.add.set["technology"]
# Create a reporter
rep = Reporter.from_scenario(scenario)
# Add simulated solution data
# TODO expand
data = dict(
nl=[info.N[-1]] * 2,
t=["ELC_100", "ELC_100"],
yv=[2020, 2020],
ya=[2020, 2025],
value=[1.0, 1.1],
add_simulated_solution(rep, info, data)
# Register the callback to set up transport reporting
# Prepare the reporter
with silence_log("genno", logging.CRITICAL):
message_ix_models.report.prepare_reporter(context, reporter=rep)
log.debug(f"simulated_solution: {context.regions = }")
return rep