Source code for message_ix_models.util.scenarioinfo

""":class:`.ScenarioInfo` class."""

import logging
import re
from collections import defaultdict
from dataclasses import InitVar, dataclass, field
from itertools import product
from typing import TYPE_CHECKING, Optional

import pandas as pd
import pint
import sdmx.model.v21 as sdmx_model

from .ixmp import parse_url

if TYPE_CHECKING:
    from message_ix import Scenario

log = logging.getLogger(__name__)


# TODO: use kw_only=True once python 3.10 is oldest supported version
# FIXME the .. autosummary part does not render correctly in VSCode preview
[docs] @dataclass() class ScenarioInfo: """Information about a :class:`.Scenario` object. Code that prepares data for a target Scenario can accept a ScenarioInfo instance. This avoids the need to create or load an actual Scenario or its data, which can be a performance-limiting step. ScenarioInfo objects can also be used (for instance, by :func:`.apply_spec`) to describe the contents of a Scenario *before* it is created. ScenarioInfo objects have the following convenience attributes: .. autosummary:: ~ScenarioInfo.set io_units is_message_macro N units_for Y y0 yv_ya Parameters ---------- scenario_obj : message_ix.Scenario If given, :attr:`.set` is initialized from this existing scenario. Examples -------- Iterating over an instance gives "model", "scenario", "version" and the values of the respective attributes: >>> si = ScenarioInfo.from_url("model name/scenario name#123") >>> dict(si) {'model': 'model name', 'scenario': 'scenario name', 'version': 123} See also -------- .Spec """ # TODO: give this field kw_only=False once python 3.10 is the minimum version # Parameters for initialization only scenario_obj: InitVar[Optional["Scenario"]] = field(default=None) empty: InitVar[bool] = False platform_name: Optional[str] = None #: Model name; equivalent to :attr:`.TimeSeries.model`. model: Optional[str] = None #: Scenario name; equivalent to :attr:`.TimeSeries.scenario`. scenario: Optional[str] = None #: Scenario version; equivalent to :attr:`.TimeSeries.version`. version: Optional[int] = None #: Elements of :mod:`ixmp`/:mod:`message_ix` sets. set: dict[str, list] = field(default_factory=lambda: defaultdict(list)) #: Elements of :mod:`ixmp`/:mod:`message_ix` parameters. par: dict[str, pd.DataFrame] = field(default_factory=dict) #: First model year, if set, else ``Y[0]``. y0: int = -1 #: :obj:`True` if a MESSAGE-MACRO scenario. is_message_macro: bool = False _yv_ya: Optional[pd.DataFrame] = None def __post_init__(self, scenario_obj: Optional["Scenario"], empty: bool): if not scenario_obj: return self.model = scenario_obj.model self.scenario = scenario_obj.scenario self.version = ( None if scenario_obj.version is None else int(scenario_obj.version) ) if empty: return # Copy structure (set contents) for name in scenario_obj.set_list(): value = scenario_obj.set(name) try: self.set[name] = value.tolist() except AttributeError: self.set[name] = value # pd.DataFrame for ≥2-D set; don't convert # Copy data for a limited set of parameters for name in ("duration_period",): self.par[name] = scenario_obj.par(name) self.is_message_macro = "PRICE_COMMODITY" in scenario_obj.par_list() # Computed once fmy = scenario_obj.cat("year", "firstmodelyear") self.y0 = int(fmy[0]) if len(fmy) else self.set["year"][0] self._yv_ya = scenario_obj.vintage_and_active_years()
[docs] @classmethod def from_url(cls, url: str) -> "ScenarioInfo": """Create an instance using only an :attr:`url`.""" result = cls() result.url = url return result
@property def yv_ya(self): """:class:`pandas.DataFrame` with valid ``year_vtg``, ``year_act`` pairs.""" if self._yv_ya is None: # - Cartesian product of all yv and ya. # - Convert to data frame. # - Filter only valid years. self._yv_ya = ( pd.DataFrame( product(self.set["year"], self.set["year"]), columns=["year_vtg", "year_act"], ) .query("@self.y0 <= year_vtg <= year_act") .reset_index(drop=True) ) return self._yv_ya @property def N(self): """Elements of the set 'node'. See also -------- .nodes_ex_world """ return list(map(str, self.set["node"])) @property def Y(self) -> list[int]: """Elements of the set 'year' that are >= the first model year.""" return list(filter(lambda y: y >= self.y0, self.set["year"])) @property def url(self) -> str: """Identical to :attr:`.TimeSeries.url`.""" return f"{self.model}/{self.scenario}#{self.version}" @url.setter def url(self, value): p, s = parse_url(value) self.platform_name = p.get("name") for k in "model", "scenario", "version": setattr(self, k, s.get(k)) _path_re = [ (re.compile(r"[/<>:\"\\\|\?\*]+"), "_"), (re.compile("#"), "_v"), (re.compile("__+"), "_"), ] @property def path(self) -> str: """A valid file system path name similar to :attr:`url`. Characters invalid in Windows paths are replaced with "_". """ from functools import reduce return reduce(lambda s, e: e[0].sub(e[1], s), self._path_re, self.url)
[docs] def update(self, other: "ScenarioInfo"): """Update with the set elements of `other`.""" for name, data_list in other.set.items(): self.set[name].extend( filter(lambda id: id not in self.set[name], data_list) ) for name, data_frame in other.par.items(): log.warning(f"Not implemented: merging parameter data for {name!r}")
def __iter__(self): for k in "model", "scenario", "version": yield (k, getattr(self, k)) def __or__(self, other) -> "ScenarioInfo": if not isinstance(other, ScenarioInfo): return NotImplemented result = ScenarioInfo() result.update(self) result.update(other) return result def __repr__(self): return ( f"<ScenarioInfo: {sum(len(v) for v in self.set.values())} code(s) in " f"{len(self.set)} set(s)>" )
[docs] def units_for(self, set_name: str, id: str) -> pint.Unit: """Return the units associated with code `id` in MESSAGE set `set_name`. :mod:`ixmp` (or the sole :class:`~ixmp.backend.base.JDBCBackend`, as of v3.5.0) does not handle unit information for variables and equations (unlike parameter values), such as MESSAGE decision variables ``ACT``, ``CAP``, etc. In :mod:`message_ix_models` and :mod:`message_data`, the following conventions are (generally) followed: - The units of ``ACT`` and others are consistent for each ``technology``. - The units of ``COMMODITY_BALANCE``, ``STOCK``, ``commodity_stock``, etc. are consistent for each ``commodity``. Thus, codes for elements of these sets (e.g. :ref:`commodity-yaml`) can be used to carry the standard units for the corresponding quantities. :func:`units_for` retrieves these units, for use in model-building and reporting. .. todo:: Expand this discussion and transfer to the :mod:`message_ix` docs. See also -------- io_units """ try: idx = self.set[set_name].index(id) except ValueError: print(self.set[set_name]) raise code = self.set[set_name][idx] try: return code.eval_annotation( id="units", globals=dict(registry=pint.get_application_registry()) ) except AttributeError: raise TypeError(f"{set_name!s} element {code!r} is str, not Code")
[docs] def io_units( self, technology: str, commodity: str, level: Optional[str] = None ) -> pint.Unit: """Return units for the MESSAGE ``input`` or ``output`` parameter. These are implicitly determined as the ratio of: - The units for the origin (for ``input``) or destination `commodity`, per :meth:`.units_for`. - The units of activity for the `technology`. Parameters ---------- level : str Placeholder for future functionality, i.e. to use different units per (commodity, level). Currently ignored. If given, a debug message is logged. Raises ------ ValueError if either `technology` or `commodity` lack defined units. """ if level is not None: log.debug(f"{level = } ignored") c = self.units_for("commodity", commodity) t = self.units_for("technology", technology) if None in (c, t): raise ValueError( "Cannot compute input/output units for: " f"commodity={commodity!r} [{c}] / technology={technology!r} [{t}]" ) return c / t
[docs] def substitute_codes(self) -> None: """Update the members of :attr:`set` using :func:`.get_codelist`. Members of the following set(s) that are :class:`str` are replaced with codes loaded from the corresponding code list(s), including all annotations: - ``commodity`` .. todo:: Extend for other sets and cases where there are multiple code lists (for instance ``year``, ``region``). """ from message_ix_models.model.structure import get_codelist for cl_id in ("commodity",): cl = get_codelist(cl_id) for i, value in enumerate(self.set[cl_id]): if isinstance(value, str): try: self.set[cl_id][i] = cl[value] except KeyError: pass
[docs] def year_from_codes(self, codes: list[sdmx_model.Code]): """Update using a list of `codes`. The following are updated: - :attr:`.set` ``year`` - :attr:`.set` ``cat_year``, with the first model year. - :attr:`.par` ``duration_period`` Any existing values are discarded. After this, the attributes :attr:`.y0` and :attr:`.Y` give the first model year and model years, respectively. Examples -------- Get a particular code list, create a ScenarioInfo instance, and update using the codes: >>> years = get_codes("year/A") >>> info = ScenarioInfo() >>> info.year_from_codes(years) Use populated values: >>> info.y0 2020 >>> info.Y[:3] [2020, 2030, 2040] >>> info.Y[-3:] [2090, 2100, 2110] """ # Clear existing values if len(self.set["year"]): log.debug(f"Discard existing 'year' elements: {repr(self.set['year'])}") self.set["year"] = [] if len(self.set["cat_year"]): log.debug( f"Discard existing 'cat_year' elements: {repr(self.set['cat_year'])}" ) self.set["cat_year"] = [] if "duration_period" in self.par: log.debug("Discard existing 'duration_period' elements") fmy_set = False duration_period: list[dict] = [] # TODO use sorted() here once supported by sdmx for code in codes: year = int(code.id) # Store the year self.set["year"].append(year) # Check for an annotation 'firstmodelyear: true' if code.eval_annotation(id="firstmodelyear"): if fmy_set: # No coverage: data that triggers this should not be committed raise ValueError( # pragma: no cover "≥2 periods are annotated firstmodelyear: true" ) self.y0 = year self.set["cat_year"].append(("firstmodelyear", year)) fmy_set = True # Store the duration_period: either from an annotation, or computed vs. the # prior period duration_period.append( dict( year=year, value=code.eval_annotation(id="duration_period") or (year - duration_period[-1]["year"]), unit="y", ) ) # Store self.par["duration_period"] = pd.DataFrame(duration_period)
[docs] @dataclass class Spec: """A specification for the structure of a model or variant. A Spec collects 3 :class:`.ScenarioInfo` instances at the attributes :attr:`.add`, :attr:`.remove`, and :attr:`.require`. This is the type that is accepted by :func:`.apply_spec`; :doc:`model-build` describes how a Spec is used to modify a :class:`.Scenario`. A Spec may also be used to express information about the target structure of data to be prepared; like ScenarioInfo, this can happen before the target Scenario exists. Spec also provides: - Dictionary-style access, e.g. ``s["add"]`` is equivalent to ``s.add.``. - Error checking; setting keys other than add/remove/require results in an error. - :meth:`.merge`, a helper method. """ #: Structure to be added to a base scenario. add: ScenarioInfo = field(default_factory=ScenarioInfo) #: Structure to be removed from a base scenario. remove: ScenarioInfo = field(default_factory=ScenarioInfo) #: Structure that must be present in a base scenario. require: ScenarioInfo = field(default_factory=ScenarioInfo) # Dict-like features def __getitem__(self, key): try: return getattr(self, key) except AttributeError: raise KeyError(key) def __setitem__(self, key, value: ScenarioInfo): if not hasattr(self, key): raise KeyError(key) setattr(self, key, value) def values(self): yield self.add yield self.remove yield self.require # Static methods
[docs] @staticmethod def merge(a: "Spec", b: "Spec") -> "Spec": """Merge Specs `a` and `b` together. Returns a new Spec where each member is a union of the respective members of `a` and `b`. """ result = Spec() for key in {"add", "remove", "require"}: result[key].update(a[key]) result[key].update(b[key]) return result