Source code for message_ix_models.report.operator

"""Atomic reporting operations for MESSAGEix-GLOBIOM."""

import itertools
import logging
import re
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Optional, Union

import ixmp
import pandas as pd
from genno import Quantity
from genno.core.operator import Operator
from genno.operator import pow
from iam_units import convert_gwp
from iam_units.emissions import SPECIES

from message_ix_models import Context
from message_ix_models.util import add_par_data, nodes_ex_world

if TYPE_CHECKING:
    from pathlib import Path

    from genno import Computer, Key
    from genno.types import AnyQuantity
    from sdmx.model.v21 import Code

log = logging.getLogger(__name__)

__all__ = [
    "add_par_data",
    "codelist_to_groups",
    "compound_growth",
    "exogenous_data",
    "filter_ts",
    "from_url",
    "get_ts",
    "gwp_factors",
    "make_output_path",
    "model_periods",
    "nodes_ex_world",
    "quantity_from_iamc",
    "remove_ts",
    "share_curtailment",
]


[docs] def codelist_to_groups( codes: list["Code"], dim: str = "n" ) -> Mapping[str, Mapping[str, list[str]]]: """Convert `codes` into a mapping from parent items to their children. The returned value is suitable for use with :func:`genno.operator.aggregate`. If this is a list of nodes per :func:`.get_codes`, then the mapping is from regions to the ISO 3166-1 alpha-3 codes of the countries within each region. The code for the region itself is also included in the values to be aggregated, so that already- aggregated data will pass through. """ groups = dict() for code in filter(lambda c: len(c.child), codes): groups[code.id] = [code.id] + list(map(str, code.child)) return {dim: groups}
[docs] def compound_growth(qty: Quantity, dim: str) -> Quantity: """Compute compound growth along `dim` of `qty`.""" # Compute intervals along `dim` # The value at index d is the duration between d and the next index d+1 c = qty.coords[dim] dur = (c - c.shift({dim: 1})).fillna(0).shift({dim: -1}).fillna(0) # - Raise the values of `qty` to the power of the duration. # - Compute cumulative product along `dim` from the first index. # - Shift, so the value at index d is the growth relative to the prior index d-1 # - Fill in 1.0 for the first index. return pow(qty, Quantity(dur)).cumprod(dim).shift({dim: 1}).fillna(1.0)
@Operator.define() def exogenous_data(): """No action. This exists to connect :func:`.exo_data.prepare_computer` to :meth:`genno.Computer.add`. """ pass # pragma: no cover @exogenous_data.helper def add_exogenous_data( func, c: "Computer", *, context=None, source=None, source_kw=None ) -> tuple["Key", ...]: """Prepare `c` to compute exogenous data from `source`.""" from message_ix_models.tools.exo_data import prepare_computer return prepare_computer( context or Context.get_instance(-1), c, source=source, source_kw=source_kw )
[docs] def filter_ts(df: pd.DataFrame, expr: re.Pattern, *, column="variable") -> pd.DataFrame: """Filter time series data in `df`. 1. Keep only rows in `df` where `expr` is a full match ( :meth:`~pandas.Series.str.fullmatch`) for the entry in `column`. 2. Retain only the first match group ("...(...)...") from `expr` as the `column` entry. """ return df[df[column].str.fullmatch(expr)].assign( variable=df[column].str.replace(expr, r"\1", regex=True) )
[docs] def get_ts( scenario: ixmp.Scenario, filters: Optional[dict] = None, iamc: bool = False, subannual: Union[bool, str] = "auto", ): """Retrieve timeseries data from `scenario`. Corresponds to :meth:`ixmp.Scenario.timeseries`. .. todo:: Move upstream, e.g. to :mod:`ixmp` alongside :func:`.store_ts`. """ filters = filters or dict() return scenario.timeseries(iamc=iamc, subannual=subannual, **filters)
[docs] def gwp_factors() -> Quantity: """Use :mod:`iam_units` to generate a Quantity of GWP factors. The quantity is dimensionless, e.g. for converting [mass] to [mass], andhas dimensions: - 'gwp metric': the name of a GWP metric, e.g. 'SAR', 'AR4', 'AR5'. All metrics are on a 100-year basis. - 'e': emissions species, as in MESSAGE. The entry 'HFC' is added as an alias for the species 'HFC134a' from iam_units. - 'e equivalent': GWP-equivalent species, always 'CO2'. """ dims = ["gwp metric", "e", "e equivalent"] metric = ["SARGWP100", "AR4GWP100", "AR5GWP100"] species_to = ["CO2"] # Add to this list to perform additional conversions data = [] for m, s_from, s_to in itertools.product(metric, SPECIES, species_to): # Get the conversion factor from iam_units factor = convert_gwp(m, (1, "kg"), s_from, s_to).magnitude # MESSAGEix-GLOBIOM uses e='HFC' to refer to this species if s_from == "HFC134a": s_from = "HFC" # Store entry data.append((m[:3], s_from, s_to, factor)) # Convert to Quantity object and return return Quantity( pd.DataFrame(data, columns=dims + ["value"]).set_index(dims)["value"].dropna() )
[docs] def make_output_path(config: Mapping, name: Union[str, "Path"]) -> "Path": """Return a path under the "output_dir" Path from the reporter configuration.""" return config["output_dir"].joinpath(name)
[docs] def model_periods(y: list[int], cat_year: pd.DataFrame) -> list[int]: """Return the elements of `y` beyond the firstmodelyear of `cat_year`. .. todo:: Move upstream, to :mod:`message_ix`. """ y0 = cat_year.query("type_year == 'firstmodelyear'")["year"].item() return list(filter(lambda year: y0 <= year, y))
[docs] def remove_ts( scenario: ixmp.Scenario, config: Optional[dict] = None, after: Optional[int] = None, dump: bool = False, ) -> None: """Remove all time series data from `scenario`. Note that data stored with :meth:`.add_timeseries` using :py:`meta=True` as a keyword argument cannot be removed using :meth:`.TimeSeries.remove_timeseries`, and thus also not with this operator. .. todo:: Move upstream, to :mod:`ixmp` alongside :func:`.store_ts`. """ if dump: raise NotImplementedError data = scenario.timeseries().drop("value", axis=1) N = len(data) count = f"{N}" if after: query = f"{after} <= year" data = data.query(query) count = f"{len(data)} of {N} ({query})" log.info(f"Remove {count} rows of time series data from {scenario.url}") # TODO improve scenario.transact() to allow timeseries_only=True; use here scenario.check_out(timeseries_only=True) try: scenario.remove_timeseries(data) except Exception: scenario.discard_changes() else: scenario.commit(f"Remove time series data ({__name__}.remove_all_ts)")
# Non-weak references to objects to keep them alive _FROM_URL_REF: set[Any] = set()
[docs] def from_url(url: str, cls=ixmp.TimeSeries) -> ixmp.TimeSeries: """Return a :class:`ixmp.TimeSeries` or subclass instance, given its `url`. .. todo:: Move upstream, to :mod:`ixmp.report`. Parameters ---------- cls : type, optional Subclass to instantiate and return; for instance, :class:`.Scenario`. """ ts, mp = cls.from_url(url) assert ts is not None _FROM_URL_REF.add(ts) _FROM_URL_REF.add(mp) return ts
[docs] def quantity_from_iamc(qty: "AnyQuantity", variable: str) -> "AnyQuantity": """Extract data for a single measure from `qty` with (at least) dimensions v, u. .. todo:: Move upstream, to either :mod:`ixmp` or :mod:`genno`. Parameters ---------- variable : str Regular expression to match the ``v`` dimension of `qty`. """ from genno.operator import relabel, select expr = re.compile(variable) variables, replacements = [], {} for var in qty.coords["v"].data: if match := expr.fullmatch(var): variables.append(match.group(0)) replacements[match.group(0)] = match.group(1) subset = qty.pipe(select, {"v": variables}).pipe(relabel, {"v": replacements}) unique_units = subset.coords["Unit"].data assert 1 == len(unique_units) subset.units = unique_units[0] return subset.sel(Unit=unique_units[0], drop=True)
# commented: currently unused # def share_cogeneration(fraction, *parts): # """Deducts a *fraction* from the first of *parts*.""" # return parts[0] - (fraction * sum(parts[1:]))
[docs] def share_curtailment(curt, *parts): """Apply a share of *curt* to the first of *parts*. If this is being used, it usually will indicate the need to split *curt* into multiple technologies; one for each of *parts*. """ return parts[0] - curt * (parts[0] / sum(parts))