"""Build MESSAGEix-Transport on a base model."""
import logging
from importlib import import_module
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
import pandas as pd
from genno import Computer, KeyExistsError, Quantity, quote
from message_ix import Scenario
from message_ix_models import Context, ScenarioInfo
from message_ix_models.model import bare, build
from message_ix_models.util import minimum_version
from message_ix_models.util._logging import mark_time
from . import Config
from .structure import get_technology_groups
if TYPE_CHECKING:
import pathlib
from genno.types import AnyQuantity
log = logging.getLogger(__name__)
[docs]def write_report(qty: "AnyQuantity", path: Path, kwargs=None) -> None:
"""Similar to :func:`.genno.operator.write_report`, but include units.
.. todo:: Move upstream, to :mod:`genno`.
"""
from genno import operator
from message_ix_models.util import datetime_now_with_tz
kwargs = kwargs or dict()
kwargs.setdefault(
"header_comment",
f"""`{qty.name}` data from MESSAGEix-Transport calibration.
Generated: {datetime_now_with_tz().isoformat()}
Units: {qty.units:~}
""",
)
operator.write_report(qty, path, kwargs)
[docs]def add_debug(c: Computer) -> None:
"""Add tasks for debugging the build."""
from genno import Key, KeySeq
from .key import gdp_cap, ms, pdt_nyt
context: Context = c.graph["context"]
config: Config = context.transport
# Path to output file
if config.with_scenario and config.with_solution:
# Output to a directory corresponding to the Scenario URL
label = c.graph["scenario"].url.replace("/", "_")
else:
# Output to a directory name constructed from settings
# Remove ":" to be compatible with actions/upload-artifact
ssp = str(config.ssp).replace(":", "_")
label = f"{ssp}-{context.model.regions}-{context.model.years}"
output_dir = context.get_local_path("transport", f"debug-{label}")
output_dir.mkdir(exist_ok=True, parents=True)
# Store in the config, but not at "output_dir" that is used by e.g. reporting
c.graph["config"]["transport build debug dir"] = output_dir
# FIXME Duplicated from base.prepare_reporter()
e_iea = Key("energy:n-y-product-flow:iea")
e_fnp = KeySeq(e_iea.drop("y"))
e_cnlt = Key("energy:c-nl-t:iea+0")
# Transform IEA EWEB data for comparison
c.add(e_fnp[0], "select", e_iea, indexers=dict(y=2020), drop=True)
c.add(e_fnp[1], "aggregate", e_fnp[0], "groups::iea to transport", keep=False)
c.add(e_cnlt, "rename_dims", e_fnp[1], quote(dict(flow="t", n="nl", product="c")))
# Write some intermediate calculations from the build process to file
debug_keys = []
for i, (key, stem) in enumerate(
(
(gdp_cap, "gdp-ppp-cap"),
(pdt_nyt, "pdt"),
(pdt_nyt + "capita+post", "pdt-cap"),
(ms, "mode-share"),
(e_fnp[0], "energy-iea-0"),
(e_cnlt, "energy-iea-1"),
)
):
debug_keys.append(f"transport debug {i}")
c.add(debug_keys[-1], write_report, key, output_dir.joinpath(f"{stem}.csv"))
def _(*args) -> "pathlib.Path":
"""Do nothing with the computed `args`, but return `output_path`."""
return output_dir
debug_plots = (
"demand-exo demand-exo-capita demand-exo-capita-gdp inv_cost"
# FIXME The following currently don't work, as their required/expected input
# keys (from the post-solve/report step) do not exist in the build step
# " var-cost fix-cost"
).split()
c.add(
"transport build debug",
_,
# NB To omit some or all of these calculations / plots from the debug outputs
# for individuals, comment 1 or both of the following lines
*debug_keys,
*[f"plot {p}" for p in debug_plots],
)
# log.info(c.describe("transport build debug"))
# Also generate these debugging outputs when building the scenario
c.graph["add transport data"].append("transport build debug")
[docs]def debug_multi(context: Context, *paths: Path) -> None:
"""Generate plots comparing data from multiple build debug directories."""
from .plot import ComparePDT, ComparePDTCap0, ComparePDTCap1
if isinstance(paths[0], Scenario):
# Workflow was called with --from="…", so paths from the previous step are not
# available; try to guess
paths = sorted(
filter(
Path.is_dir, context.get_local_path("transport").glob("debug-ICONICS_*")
)
)
c = Computer(config={"transport build debug dir": paths[0].parent})
c.require_compat("message_ix_models.report.operator")
for cls in (ComparePDT, ComparePDTCap0, ComparePDTCap1):
key = c.add(f"compare {cls.basename}", cls, *paths)
c.get(key)
[docs]def add_exogenous_data(c: Computer, info: ScenarioInfo) -> None:
"""Add exogenous data to `c` that mocks data coming from an actual Scenario.
The specific quantities added are:
- ``GDP:n-y``, from GEA, SSP, or SHAPE data; see :func:`.gdp_pop`.
- ``PRICE_COMMODITY:n-c-y``, currently mocked based on the shape of ``GDP:n-y``
using :func:`.dummy_prices`.
.. todo:: Add an external data source.
- ``MERtoPPP:n-y``, from :file:`mer-to-ppp.csv`. If ``context.model.regions`` is
“R14”, data are adapted from R11 using :obj:`.adapt_R11_R14`.
See also
--------
:doc:`/reference/model/transport/input`
"""
# Ensure that the SSPOriginal and SSPUpdate data providers are available
import message_ix_models.project.advance.data # noqa: F401
import message_ix_models.project.ssp.data # noqa: F401
import message_ix_models.tools.iea.web # noqa: F401
from message_ix_models.project.ssp import SSP_2017, SSP_2024
from message_ix_models.tools.exo_data import prepare_computer
# Ensure that the MERtoPPP data provider is available
from . import data # noqa: F401
# Added keys
keys = {}
context = c.graph["context"]
config: "Config" = c.graph["config"]["transport"]
# Identify appropriate source keyword arguments for loading GDP and population data
source = str(config.ssp)
if config.ssp in SSP_2017:
source_kw: Tuple[Dict[str, Any], ...] = (
dict(measure="GDP", model="IIASA GDP"),
dict(measure="POP", model="IIASA GDP"),
)
elif config.ssp in SSP_2024:
source_kw = (
dict(measure="GDP", model="IIASA GDP 2023"),
dict(measure="POP"),
)
for kw in source_kw:
keys[kw["measure"]] = prepare_computer(
context, c, source, source_kw=kw, strict=False
)
# Add data for MERtoPPP
kw = dict(measure="MERtoPPP", nodes=context.model.regions)
prepare_computer(
context, c, "message_ix_models.model.transport", source_kw=kw, strict=False
)
# Add IEA Extended World Energy Balances data; select only the flows related to
# transport
kw = dict(
provider="OECD",
edition="2022",
flow=[
"DOMESAIR",
"DOMESNAV",
"PIPELINE",
"RAIL",
"ROAD",
"TOTTRANS",
"TRNONSPE",
"WORLDAV",
"WORLDMAR",
],
)
prepare_computer(context, c, "IEA_EWEB", source_kw=kw, strict=False)
# Alias for use in reporting
# TODO Fix the upstream code so that the name is not "unknown"
c.add("energy:n-y-product-flow:iea", "unknown:n-y-product-flow")
# Add IEA Future of Trucks data
for kw in dict(measure=1), dict(measure=2):
prepare_computer(context, c, "IEA Future of Trucks", source_kw=kw, strict=False)
# Add ADVANCE data
common = dict(model="MESSAGE", scenario="ADV3TRAr2_Base", aggregate=False)
for n, m, u in (
("pdt ldv", "Transport|Service demand|Road|Passenger|LDV", "Gp km / a"),
("fv", "Transport|Service demand|Road|Freight", "Gt km"),
):
# Add the base data
kw = dict(measure=m, name=f"advance {n}")
kw.update(common)
key, *_ = prepare_computer(context, c, "ADVANCE", source_kw=kw, strict=False)
# Broadcast to R12
c.add(f"{n}:n:advance", "broadcast_advance", key, "y0", "config")
# Alias for other computations which expect the upper-case name
c.add("MERtoPPP:n-y", "mertoppp:n-y")
try:
c.add("GDP:n-y", "gdp:n-y", strict=True)
except KeyExistsError as e:
log.info(repr(e)) # Solved scenario that already has this key
# Ensure correct units
c.add("population:n-y", "mul", "pop:n-y", Quantity(1.0, units="passenger"))
# Dummy prices
try:
c.add(
"PRICE_COMMODITY:n-c-y",
"dummy_prices",
keys["GDP"][0],
sums=True,
strict=True,
)
except KeyExistsError as e:
log.info(repr(e)) # Solved scenario that already has this key
# Data from files
from .files import FILES, ExogenousDataFile
# Identify the mode-share file according to the config setting
ExogenousDataFile(
("mode-share", config.mode_share),
"mode share:n-t:ref",
"Reference (base year) mode share",
)
for f in FILES:
c.add("", f, context=context)
[docs]def add_structure(c: Computer):
"""Add keys to `c` for model structure required by demand computations.
This uses `info` to mock the contents that would be reported from an already-
populated Scenario for sets "node", "year", and "cat_year".
"""
from operator import itemgetter
from ixmp.report import configure
config: "Config" = c.graph["context"].transport
info = config.base_model_info # Information about the base scenario
spec = config.spec # Specification for MESSAGEix-Transport structure
# Update RENAME_DIMS with transport-specific concepts/dimensions. This allows to use
# genno.operator.load_file(…, dims=RENAME_DIMS) in add_exogenous_data()
# TODO move to a more appropriate location
configure(
rename_dims={
"area_type": "area_type",
"attitude": "attitude",
"census_division": "census_division",
"consumer_group": "cg",
"driver_type": "driver_type",
"vehicle_class": "vehicle_class",
}
)
for key, *comp in (
# Configuration
("info", lambda c: c.transport.base_model_info, "context"),
("dry_run", lambda c: c.core.dry_run, "context"),
# Structure
("c::transport", quote(info.set["commodity"])),
("cg", quote(spec.add.set["consumer_group"])),
("indexers:cg", spec.add.set["consumer_group indexers"]),
("n", quote(list(map(str, info.set["node"])))),
("nodes", quote(info.set["node"])),
("indexers:scenario", quote(dict(scenario=repr(config.ssp).split(":")[1]))),
("t::transport modes", quote(config.demand_modes)),
("y", quote(info.set["year"])),
(
"cat_year",
pd.DataFrame([["firstmodelyear", info.y0]], columns=["type_year", "year"]),
),
):
try:
c.add(key, *comp, strict=True) # Raise an exception if `key` exists
except KeyExistsError:
continue # Already present; don't overwrite
# Retrieve information about the model structure
technologies = spec.add.set["technology"]
t_groups = get_technology_groups(spec)
# Lists and subsets
c.add("c::transport", quote(spec.add.set["commodity"]))
c.add("t::transport", quote(technologies))
# Create a quantity for broadcasting t to t, c, l
c.add("input_commodity_level", "broadcast:t-c-l", "t::transport", quote("final"))
# List of nodes excluding "World"
# TODO move upstream, to message_ix
c.add("n::ex world", "nodes_ex_world", "n")
c.add(
"n:n:ex world",
lambda n: Quantity([1.0] * len(n), coords={"n": n}),
"n::ex world",
)
c.add("n::ex world+code", "nodes_ex_world", "nodes")
c.add("nl::world agg", "nodes_world_agg", "config")
# Model periods only
c.add("y::model", "model_periods", "y", "cat_year")
c.add("y0", itemgetter(0), "y::model")
c.add("broadcast:y-yv-ya", "broadcast_y_yv_ya", "y", "y::model")
# Mappings for use with aggregate, select, etc.
c.add("t::transport agg", quote(dict(t=t_groups)))
# Sum across modes, including "non-ldv"
c.add("t::transport modes 0", quote(dict(t=list(t_groups.keys()))))
# Sum across modes, excluding "non-ldv"
c.add(
"t::transport modes 1",
quote(dict(t=list(filter(lambda k: k != "non-ldv", t_groups.keys())))),
)
for id, techs in t_groups.items():
c.add(f"t::transport {id}", quote(dict(t=techs)))
c.add("t::transport all", quote(dict(t=technologies)))
# Mappings for use with IEA Extended World Energy Balances data
c.add("groups::iea eweb", "groups_iea_eweb", "t::transport")
# Unpack
c.add("groups::iea to transport", itemgetter(0), "groups::iea eweb")
c.add("groups::transport to iea", itemgetter(1), "groups::iea eweb")
c.add("indexers::iea to transport", itemgetter(2), "groups::iea eweb")
[docs]@minimum_version("message_ix 3.8")
def get_computer(
context: Context,
obj: Optional[Computer] = None,
*,
visualize: bool = True,
**kwargs,
) -> Computer:
"""Return a :class:`genno.Computer` set up for model-building calculations."""
from . import operator
# Configure
config = Config.from_context(context, **kwargs)
# Structure information for the base model
scenario = kwargs.get("scenario")
if scenario:
config.base_model_info = ScenarioInfo(scenario)
config.with_scenario = True
config.with_solution = scenario.has_solution()
else:
base_spec = bare.get_spec(context)
config.base_model_info = base_spec["add"]
config.with_scenario = config.with_solution = False
# Create a Computer
c = obj or Computer()
# Require modules with operators
c.require_compat("ixmp.report.operator")
c.require_compat("message_ix.report.operator")
c.require_compat("message_ix_models.report.operator")
c.require_compat(operator)
# Transfer data from `context` to "config" in the genno graph
for k, v in {
"regions": context.model.regions,
"transport": context.transport,
"data source": dict(),
"output_dir": context.get_local_path(),
}.items():
c.graph["config"].setdefault(k, v)
# Attach the context and scenario
c.add("context", context)
c.add("scenario", scenario)
# Add a computation that is an empty list.
# Individual modules's prepare_computer() functions can append keys.
c.add("add transport data", [])
# Add structure-related keys
add_structure(c)
# Add exogenous data
add_exogenous_data(c, config.base_model_info)
# For each module in transport.Config.modules, invoke the function
# prepare_computer() to add further calculations
for name in context.transport.modules:
module = import_module(name if "." in name else f"..{name}", __name__)
module.prepare_computer(c)
# Add tasks for debugging the build
add_debug(c)
if visualize:
path = context.get_local_path("transport", "build.svg")
path.parent.mkdir(exist_ok=True)
c.visualize(filename=path, key="add transport data")
log.info(f"Visualization written to {path}")
return c
[docs]def main(
context: Context,
scenario: Scenario,
options: Optional[Dict] = None,
**option_kwargs,
):
"""Build MESSAGEix-Transport on `scenario`.
See also
--------
add_data
apply_spec
get_spec
"""
from .emission import strip_emissions_data
from .util import sum_numeric
# Check arguments
options = dict() if options is None else options.copy()
dupe = set(options.keys()) & set(option_kwargs.keys())
if len(dupe):
raise ValueError(f"Option(s) {repr(dupe)} appear in both `options` and kwargs")
options.update(option_kwargs)
# Use fast=True by default
options.setdefault("fast", True)
dry_run = options.pop("dry_run", False)
log.info("Configure MESSAGEix-Transport")
mark_time()
# Set up a Computer for input data calculations. This also:
# - Creates a Config instance
# - Generates and stores context.transport.spec, i.e the specification of the
# MESSAGEix-Transport structure: required, added, and removed set items
# - Prepares the "add transport data" key used below
c = get_computer(context, scenario=scenario, options=options)
def _add_data(s, **kw):
assert s is c.graph["scenario"]
result = c.get("add transport data")
# For calls to add_par_data(), int() are returned with number of observations
log.info(f"Added {sum_numeric(result)} total obs")
if dry_run:
return c.get("transport build debug")
# First strip existing emissions data
strip_emissions_data(scenario, context)
# Apply the structural changes AND add the data
log.info("Build MESSAGEix-Transport")
build.apply_spec(scenario, context.transport.spec, data=_add_data, **options)
# Required for time series data from genno reporting that is expected by legacy
# reporting
# TODO Include this in the spec, while not using it as a value for `node_loc`
scenario.platform.add_region(f"{context.model.regions}_GLB", "region", "World")
mark_time()
scenario.set_as_default()
log.info(f"Built {scenario.url} and set as default version")
return scenario