Source code for message_ix_models.model.transport.build

"""Build MESSAGEix-Transport on a base model."""

import logging
from functools import partial
from importlib import import_module
from operator import itemgetter
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional

import genno
import pandas as pd
from genno import Computer, KeyExistsError, quote
from message_ix import Scenario

from message_ix_models import Context, ScenarioInfo
from message_ix_models.model import bare, build
from message_ix_models.model.structure import get_codelist
from message_ix_models.util import minimum_version
from message_ix_models.util._logging import mark_time
from message_ix_models.util.graphviz import HAS_GRAPHVIZ

from . import Config
from .structure import get_technology_groups

if TYPE_CHECKING:
    import pathlib


log = logging.getLogger(__name__)


[docs] def add_debug(c: Computer) -> None: """Add tasks for debugging the build.""" from genno import Key, KeySeq from .key import gdp_cap, ms, pdt_nyt context: Context = c.graph["context"] config: Config = context.transport # Path to output file if config.with_scenario and config.with_solution: # Output to a directory corresponding to the Scenario URL label = c.graph["scenario"].url.replace("/", "_") else: # Output to a directory name constructed from settings # Remove ":" to be compatible with actions/upload-artifact ssp = str(config.ssp).replace(":", "_") label = f"{ssp}-{context.model.regions}-{context.model.years}" output_dir = context.get_local_path("transport", f"debug-{label}") output_dir.mkdir(exist_ok=True, parents=True) # Store in the config, but not at "output_dir" that is used by e.g. reporting c.graph["config"]["transport build debug dir"] = output_dir # FIXME Duplicated from base.prepare_reporter() e_iea = Key("energy:n-y-product-flow:iea") e_fnp = KeySeq(e_iea.drop("y")) e_cnlt = Key("energy:c-nl-t:iea+0") # Transform IEA EWEB data for comparison c.add(e_fnp[0], "select", e_iea, indexers=dict(y=2020), drop=True) c.add(e_fnp[1], "aggregate", e_fnp[0], "groups::iea to transport", keep=False) c.add(e_cnlt, "rename_dims", e_fnp[1], quote(dict(flow="t", n="nl", product="c"))) # Write some intermediate calculations from the build process to file debug_keys = [] for i, (key, stem) in enumerate( ( (gdp_cap, "gdp-ppp-cap"), (pdt_nyt, "pdt"), (pdt_nyt + "capita+post", "pdt-cap"), (ms, "mode-share"), (e_fnp[0], "energy-iea-0"), (e_cnlt, "energy-iea-1"), ) ): debug_keys.append(f"transport debug {i}") c.add( debug_keys[-1], "write_report_debug", key, output_dir.joinpath(f"{stem}.csv"), ) def _(*args) -> "pathlib.Path": """Do nothing with the computed `args`, but return `output_path`.""" return output_dir debug_plots = ( "demand-exo demand-exo-capita demand-exo-capita-gdp inv_cost" # FIXME The following currently don't work, as their required/expected input # keys (from the post-solve/report step) do not exist in the build step # " var-cost fix-cost" ).split() c.add( "transport build debug", _, # NB To omit some or all of these calculations / plots from the debug outputs # for individuals, comment 1 or both of the following lines *debug_keys, *[f"plot {p}" for p in debug_plots], ) # log.info(c.describe("transport build debug")) # Also generate these debugging outputs when building the scenario c.graph["add transport data"].append("transport build debug")
[docs] def debug_multi(context: Context, *paths: Path) -> None: """Generate plots comparing data from multiple build debug directories.""" from .plot import ComparePDT, ComparePDTCap0, ComparePDTCap1 if isinstance(paths[0], Scenario): # Workflow was called with --from="…", so paths from the previous step are not # available; try to guess paths = sorted( filter( Path.is_dir, context.get_local_path("transport").glob("debug-ICONICS_*") ) ) c = Computer(config={"transport build debug dir": paths[0].parent}) c.require_compat("message_ix_models.report.operator") for cls in (ComparePDT, ComparePDTCap0, ComparePDTCap1): key = c.add(f"compare {cls.basename}", cls, *paths) c.get(key)
[docs] def add_exogenous_data(c: Computer, info: ScenarioInfo) -> None: """Add exogenous data to `c` that mocks data coming from an actual Scenario. The specific quantities added are: - ``GDP:n-y``, from GEA, SSP, or SHAPE data; see :func:`.gdp_pop`. - ``PRICE_COMMODITY:n-c-y``, currently mocked based on the shape of ``GDP:n-y`` using :func:`.dummy_prices`. .. todo:: Add an external data source. - ``MERtoPPP:n-y``, from :file:`mer-to-ppp.csv`. If ``context.model.regions`` is “R14”, data are adapted from R11 using :obj:`.adapt_R11_R14`. See also -------- :doc:`/reference/model/transport/input` """ # Ensure that the SSPOriginal and SSPUpdate data providers are available import message_ix_models.project.advance.data # noqa: F401 import message_ix_models.project.ssp.data # noqa: F401 import message_ix_models.tools.iea.web # noqa: F401 from message_ix_models.project.ssp import SSP_2017, SSP_2024 from message_ix_models.tools.exo_data import prepare_computer # Ensure that the MERtoPPP data provider is available from . import ( data, # noqa: F401 key, ) from .files import FILES, add # Added keys keys = {} context = c.graph["context"] config: "Config" = c.graph["config"]["transport"] # Identify appropriate source keyword arguments for loading GDP and population data source = str(config.ssp) if config.ssp in SSP_2017: source_kw: tuple[dict[str, Any], ...] = ( dict(measure="GDP", model="IIASA GDP"), dict(measure="POP", model="IIASA GDP"), ) elif config.ssp in SSP_2024: source_kw = ( dict(measure="GDP", model="IIASA GDP 2023"), dict(measure="POP"), ) for kw in source_kw: keys[kw["measure"]] = prepare_computer( context, c, source, source_kw=kw, strict=False ) # Add data for MERtoPPP kw = dict(measure="MERtoPPP", nodes=context.model.regions) prepare_computer(context, c, "transport MERtoPPP", source_kw=kw, strict=False) # Add IEA Extended World Energy Balances data; select only the flows related to # transport kw = dict( provider="IEA", edition="2024", flow=( "DOMESAIR DOMESNAV PIPELINE RAIL ROAD TOTTRANS TRNONSPE WORLDAV WORLDMAR" ).split(), ) prepare_computer(context, c, "IEA_EWEB", source_kw=kw, strict=False) # Add IEA Future of Trucks data for kw in dict(measure=1), dict(measure=2): prepare_computer(context, c, "IEA Future of Trucks", source_kw=kw, strict=False) # Add ADVANCE data common = dict(model="MESSAGE", scenario="ADV3TRAr2_Base", aggregate=False) for n, m, u in ( ("pdt ldv", "Transport|Service demand|Road|Passenger|LDV", "Gp km / a"), ("fv", "Transport|Service demand|Road|Freight", "Gt km"), ): # Add the base data kw = dict(measure=m, name=f"advance {n}") kw.update(common) k, *_ = prepare_computer(context, c, "ADVANCE", source_kw=kw, strict=False) # Broadcast to R12 c.add(f"{n}:n:advance", "broadcast_advance", k, "y0", "config") # Alias for other computations which expect the upper-case name c.add("MERtoPPP:n-y", "mertoppp:n-y") # FIXME Ensure the latter case for a simulated solution # if key.GDP in c: if False: pass # Solved scenario that already has this key else: c.add(key.GDP, keys["GDP"][0]) # Ensure correct units c.add("population:n-y", "mul", "pop:n-y", genno.Quantity(1.0, units="passenger")) # FIXME Adjust to derive PRICE_COMMODITY c=transport from solved scenario with # MESSAGEix-Transport detail, then uncomment the following line # if key.price.base - "transport" in c: if False: # Alias PRICE_COMMODITY:… to PRICE_COMMODITY:*:transport, e.g. solved scenario # that already has this key c.add(key.price[0], key.price.base - "transport") else: # Not solved scenario → dummy prices c.add(key.price[0], "dummy_prices", keys["GDP"][0], sums=True) # Data from files # Identify the mode-share file according to the config setting add( key="mode share:n-t:exo", path=("mode-share", config.mode_share), name="Reference (base year) mode share", units="dimensionless", replace=True, ) for f in FILES: c.add("", f, context=context)
#: :mod:`genno` tasks for model structure information that are 'static'—that is, do not #: change based on :class:`~.transport.config.Config` settings. See #: :func:`add_structure`. #: #: These include: #: #: - ``info``: :attr:`transport.Config.base_model_info #: <transport.config.Config.base_model_info>`, an instance of :class:`.ScenarioInfo`. #: - ``transport info``: the logical union of #: :attr:`~.transport.config.Config.base_model_info` and the :attr:`.Spec.add` member #: of :attr:`Config.spec <.transport.config.Config.spec>`. This includes #: all set elements that will be present in the build model. #: - ``dry_run``: :attr:`.Config.dry_run`. #: - ``e::codelist``: :func:`.get_codelist` for :ref:`emission-yaml`. #: - ``groups::iea to transport``, ``groups::transport to iea``, ``indexers::iea to #: transport``: the 3 outputs of :func:`.groups_iea_eweb`, for use with IEA Extended #: World Energy Balances data. #: - ``n::ex world``: |n| as :class:`list` of :class:`str`, excluding "World". See #: :func:`.nodes_ex_world`. #: - ``n::ex world+code``: |n| as :class:`list`` of :class:`.Code`, excluding "World". #: - ``n:n:ex world``: a 1-dimensional :class:`.Quantity` for broadcasting (values all #: 1). #: - ``nl::world agg``: :class:`dict` mapping to aggregate "World" from individual |n|. #: See :func:`.nodes_world_agg`. STRUCTURE_STATIC = ( ("info", lambda c: c.transport.base_model_info, "context"), ( "transport info", lambda c: c.transport.base_model_info | c.transport.spec.add, "context", ), ("dry_run", lambda c: c.core.dry_run, "context"), ("e::codelist", partial(get_codelist, "emission")), ("groups::iea eweb", "groups_iea_eweb", "t::transport"), ("groups::iea to transport", itemgetter(0), "groups::iea eweb"), ("groups::transport to iea", itemgetter(1), "groups::iea eweb"), ("indexers::iea to transport", itemgetter(2), "groups::iea eweb"), ("n::ex world", "nodes_ex_world", "n"), ( "n:n:ex world", lambda n: genno.Quantity([1.0] * len(n), coords={"n": n}), "n::ex world", ), ("n::ex world+code", "nodes_ex_world", "nodes"), ("nl::world agg", "nodes_world_agg", "config"), )
[docs] def add_structure(c: Computer) -> None: """Add tasks to `c` for structures required by :mod:`.transport.build`. These include: - The following keys *only* if not already present in `c`. If, for example, `c` is a :class:`.Reporter` prepared from an already-solved :class:`.Scenario`, the existing tasks referring to the Scenario contents are not changed. - ``n``: |n| as :class:`list` of :class:`str`. - ``y``: |y| in the base model. - ``cat_year``: simulated data structure for "cat_year" with at least 1 row :py:`("firstmodelyear", y0)`. - ``y::model``: |y| within the model horizon as :class:`list` of :class:`int`. - ``y0``: The first model period, :class:`int`. - All tasks from :data:`STRUCTURE_STATIC`. - ``c::transport``: the |c| set of the :attr:`~.Spec.add` member of :attr:`Config.spec <.transport.config.Config.spec>`, transport commodities to be added. - ``c::transport+base``: all |c| that will be present in the build model - ``cg``: "consumer group" set elements. - ``indexers:cg``: ``cg`` as indexers. - ``nodes``: |n| in the base model. - ``indexers:scenario``: :class:`dict` mapping "scenario" to the short form of :attr:`Config.ssp <.transport.config.Config.ssp>` (for instance, "SSP1"), for indexing. - ``t::transport``: all transport |t| to be added, :class:`list`. - ``t::transport agg``: :class:`dict` mapping "t" to the output of :func:`.get_technology_groups`. For use with operators like 'aggregate', 'select', etc. - ``t::transport all``: :class:`dict` mapping "t" to ``t::transport``. .. todo:: Choose a more informative key. - ``t::transport modes``: :attr:`Config.demand_modes <.transport.config.Config.demand_modes>`. - ``t::transport modes 0``: :class:`dict` mapping "t" to the keys only from ``t::transport agg``. Use with 'aggregate' to produce the sum across modes, including "non-LDV". - ``t::transport modes 1``: same as ``t::transport modes 0`` except excluding "non-ldv". - ``t::RAIL`` etc.: transport |t| in the "RAIL" mode/group as :class:`list` of :class:`str`. See :func:`.get_technology_groups`. - ``t::transport RAIL`` etc.: :class:`dict` mapping "t" to the elements of ``t::RAIL``. - All of the keys in :data:`.bcast_tcl` and :data:`.bcast_y`. """ from ixmp.report import configure from . import key from .operator import broadcast_t_c_l, broadcast_y_yv_ya # Retrieve configuration and other information config: "Config" = c.graph["context"].transport # .model.transport.Config object info = config.base_model_info # ScenarioInfo describing the base scenario spec = config.spec # Specification for MESSAGEix-Transport structure to be built t_groups = get_technology_groups(spec) # Technology groups/hierarchy # Update RENAME_DIMS with transport-specific concepts/dimensions. This allows to use # genno.operator.load_file(…, dims=RENAME_DIMS) in add_exogenous_data() # TODO Read from a concept scheme or list of dimensions configure( rename_dims={ "area_type": "area_type", "attitude": "attitude", "census_division": "census_division", "consumer_group": "cg", "driver_type": "driver_type", "vehicle_class": "vehicle_class", } ) # Tasks only to be added if not already present in `c`. These must be done # separately because add_queue does not support the strict/pass combination. for task in ( ("n", quote(list(map(str, info.set["node"])))), ("y", quote(info.set["year"])), ( "cat_year", pd.DataFrame([["firstmodelyear", info.y0]], columns=["type_year", "year"]), ), (key.y, "model_periods", "y", "cat_year"), ("y0", itemgetter(0), "y::model"), ): try: c.add(*task, strict=True) except KeyExistsError: # Already present # log.debug(f"Use existing {c.describe(task[0])}") pass # Assemble a queue of tasks # - `Static` tasks # - Single 'dynamic' tasks based on config, info, spec, and/or t_groups # - Multiple static and dynamic tasks generated in loops etc. tasks = list(STRUCTURE_STATIC) + [ ("c::transport", quote(spec.add.set["commodity"])), ("c::transport+base", quote(spec.add.set["commodity"] + info.set["commodity"])), ("cg", quote(spec.add.set["consumer_group"])), ("indexers:cg", spec.add.set["consumer_group indexers"]), ("nodes", quote(info.set["node"])), ("indexers:scenario", quote(dict(scenario=repr(config.ssp).split(":")[1]))), ("t::transport", quote(spec.add.set["technology"])), ("t::transport agg", quote(dict(t=t_groups))), ("t::transport all", quote(dict(t=spec.add.set["technology"]))), (key.t_modes, quote(config.demand_modes)), ("t::transport modes 0", quote(dict(t=list(t_groups.keys())))), ( "t::transport modes 1", quote(dict(t=list(filter(lambda k: k != "non-ldv", t_groups.keys())))), ), ] # Quantities for broadcasting (t,) to (t, c, l) dimensions tasks.extend( ( getattr(key.bcast_tcl, kind), partial(broadcast_t_c_l, kind=kind, default_level="final"), "t::transport", "c::transport+base", ) for kind in ("input", "output") ) # Quantities for broadcasting y to (yv, ya) for k, base, method in ( (key.bcast_y.all, "y", "product"), # All periods (key.bcast_y.model, "y::model", "product"), # Model periods only (key.bcast_y.no_vintage, "y::model", "zip"), # Model periods with no vintaging ): tasks.append((k, partial(broadcast_y_yv_ya, method=method), base, base)) # Groups of technologies and indexers # FIXME Combine or disambiguate these keys for id, techs in t_groups.items(): tasks += [ # Indexer-form of technology groups (f"t::transport {id}", quote(dict(t=techs))), # List form of technology groups (f"t::{id}", quote(techs)), ] # - Change each task from single-tuple form to (args, kwargs) with strict=True. # - Add all to the Computer, making 2 passes. c.add_queue(map(lambda t: (t, dict(strict=True)), tasks), max_tries=2, fail="raise")
[docs] @minimum_version("message_ix 3.8") def get_computer( context: Context, obj: Optional[Computer] = None, *, visualize: bool = True, **kwargs, ) -> Computer: """Return a :class:`genno.Computer` set up for model-building calculations. The returned computer contains: - Everything added by :func:`.add_structure`, :func:`.add_exogenous_data`, and :func:`.add_debug`. - For each module in :attr:`.transport.config.Config.modules`, everything added by the :py:`prepare_computer()` function in that module. - ``context``: a reference to `context`. - ``scenario``: a reference to a Scenario, if one appears in `kwargs`. - ``add transport data``: a list of keys which, when computed, will cause all transport data to be computed and added to ``scenario``. Parameters ---------- obj : If `obj` is an existing :class:`.Computer` (or subclass, such as :class`.Reporter`), tasks are added the existing tasks in its graph. Otherwise, a new Computer is created and populated. visualize : If :any:`True` (the default), a file :file:`transport/build.svg` is written in the local data directory with a visualization of the ``add transport data`` key. """ from . import key, operator # Configure config = Config.from_context(context, **kwargs) # Structure information for the base model scenario = kwargs.get("scenario") if scenario: # Retrieve structure information from an existing base model/`scenario` config.base_model_info = ScenarioInfo(scenario) config.with_scenario = True config.with_solution = scenario.has_solution() else: # Generate a Spec/ScenarioInfo for a non-existent base model/`scenario` as # described by `context` base_spec = bare.get_spec(context) config.base_model_info = base_spec["add"] config.with_scenario = config.with_solution = False # Ensure that members of e.g. base_model_info.set["commodity"] are Code objects with # their respective annotations config.base_model_info.substitute_codes() # Create a Computer c = obj or Computer() # Require modules with operators c.require_compat("ixmp.report.operator") c.require_compat("message_ix.report.operator") c.require_compat("message_ix_models.report.operator") c.require_compat(operator) # Transfer data from `context` to "config" in the genno graph for k, v in { "regions": context.model.regions, "transport": context.transport, "data source": dict(), "output_dir": context.get_local_path(), }.items(): c.graph["config"].setdefault(k, v) # Attach the context and scenario c.add("context", context) c.add("scenario", scenario) # Add a computation that is an empty list. # Individual modules's prepare_computer() functions can append keys. c.add("add transport data", []) c.add(key.report.all, []) # Needed by .plot.prepare_computer() # Add structure-related keys add_structure(c) # Add exogenous data add_exogenous_data(c, config.base_model_info) # For each module in transport.Config.modules, invoke the function # prepare_computer() to add further calculations for name in context.transport.modules: module = import_module(name if "." in name else f"..{name}", __name__) module.prepare_computer(c) # Add tasks for debugging the build add_debug(c) if visualize and HAS_GRAPHVIZ: path = context.get_local_path("transport", "build.svg") path.parent.mkdir(exist_ok=True) c.visualize(filename=path, key="add transport data") log.info(f"Visualization written to {path}") return c
[docs] def main( context: Context, scenario: Scenario, options: Optional[dict] = None, **option_kwargs, ): """Build MESSAGEix-Transport on `scenario`. See also -------- add_data apply_spec get_spec """ from .emission import strip_emissions_data from .util import sum_numeric # Check arguments options = dict() if options is None else options.copy() dupe = set(options.keys()) & set(option_kwargs.keys()) if len(dupe): raise ValueError(f"Option(s) {repr(dupe)} appear in both `options` and kwargs") options.update(option_kwargs) # Use fast=True by default options.setdefault("fast", True) dry_run = options.pop("dry_run", False) log.info("Configure MESSAGEix-Transport") mark_time() # Set up a Computer for input data calculations. This also: # - Creates a Config instance # - Generates and stores context.transport.spec, i.e the specification of the # MESSAGEix-Transport structure: required, added, and removed set items # - Prepares the "add transport data" key used below c = get_computer(context, scenario=scenario, options=options) def _add_data(s, **kw): assert s is c.graph["scenario"] result = c.get("add transport data") # For calls to add_par_data(), int() are returned with number of observations log.info(f"Added {sum_numeric(result)} total obs") if dry_run: return c.get("transport build debug") # First strip existing emissions data strip_emissions_data(scenario, context) # Apply the structural changes AND add the data log.info("Build MESSAGEix-Transport") build.apply_spec(scenario, context.transport.spec, data=_add_data, **options) # Required for time series data from genno reporting that is expected by legacy # reporting # TODO Include this in the spec, while not using it as a value for `node_loc` scenario.platform.add_region(f"{context.model.regions}_GLB", "region", "World") mark_time() scenario.set_as_default() log.info(f"Built {scenario.url} and set as default version") return scenario