Source code for message_ix_models.report.util

import logging
from collections.abc import Iterable
from typing import Optional, Union

import pandas as pd
from dask.core import quote
from genno import Key, Quantity
from genno.compat.pyam.util import collapse as genno_collapse
from genno.core.key import single_key
from iam_units import registry
from message_ix import Reporter
from sdmx.model.v21 import Code

log = logging.getLogger(__name__)


#: Replacements used in :meth:`collapse`.
#: These are applied using :meth:`pandas.DataFrame.replace` with ``regex=True``; see the
#: documentation of that method.
#:
#: - Applied to whole strings along each dimension.
#: - These columns have :meth:`str.title` applied before these replacements.
REPLACE_DIMS: dict[str, dict[str, str]] = {
    "c": {
        # in land_out, for CH4 emissions from GLOBIOM
        "Agri_Ch4": "GLOBIOM|Emissions|CH4 Emissions Total",
    },
    "l": {
        # FIXME this is probably not generally applicable and should be removed
        "Final Energy": "Final Energy|Residential",
    },
    "t": dict(),
}

#: Replacements used in :meth:`collapse` after the 'variable' column is assembled.
#: These are applied using :meth:`pandas.DataFrame.replace` with ``regex=True``; see
#: the documentation of that method. For documentation of regular expressions, see
#: https://docs.python.org/3/library/re.html and https://regex101.com.
#:
#: .. todo:: These may be particular or idiosyncratic to a single "template". The
#:    strings used to collapse multiple conceptual dimensions into the IAMC "variable"
#:    column are known to vary in poorly-documented ways across these templates.
#:
#:    This setting is currently applied universally. To improve, specify a different
#:    mapping with the replacements needed for each individual template, and load the
#:    correct one when reporting scenarios to that template.
REPLACE_VARS = {
    # Secondary energy: remove duplicate "Solids"
    r"(Secondary Energy\|Solids)\|Solids": r"\1",
    # CH4 emissions from MESSAGE technologies
    r"(Emissions\|CH4)\|Fugitive": r"\1|Energy|Supply|Fugitive",
    # CH4 emissions from GLOBIOM
    r"(Emissions\|CH4)\|((Gases|Liquids|Solids|Elec|Heat)(.*))": (
        r"\1|Energy|Supply|\3|Fugitive\4"
    ),
    r"^(land_out CH4.*\|)Awm": r"\1Manure Management",
    r"^land_out CH4\|Emissions\|Ch4\|Land Use\|Agriculture\|": (
        "Emissions|CH4|AFOLU|Agriculture|Livestock|"
    ),
    # Strip internal prefix
    r"^land_out CH4\|": "",
    # Prices
    r"Residential\|(Biomass|Coal)": r"Residential|Solids|\1",
    r"Residential\|Gas": "Residential|Gases|Natural Gas",
    r"Import Energy\|Lng": "Primary Energy|Gas",
    r"Import Energy\|Coal": "Primary Energy|Coal",
    r"Import Energy\|Oil": "Primary Energy|Oil",
    r"Import Energy\|(Liquids\|(Biomass|Oil))": r"Secondary Energy|\1",
    r"Import Energy\|Lh2": "Secondary Energy|Hydrogen",
}


[docs] def as_quantity(info: Union[dict, float, str]) -> Quantity: """Convert values from a :class:`dict` to Quantity. .. todo:: move upstream, to :mod:`genno`. """ if isinstance(info, str): q = registry.Quantity(info) return Quantity(q.magnitude, units=q.units) elif isinstance(info, float): return Quantity(info) elif isinstance(info, dict): data = info.copy() dim = data.pop("_dim") unit = data.pop("_unit") return Quantity(pd.Series(data).rename_axis(dim), units=unit) else: raise TypeError(type(info))
[docs] def collapse(df: pd.DataFrame, var=[]) -> pd.DataFrame: """Callback for the `collapse` argument to :meth:`~.Reporter.convert_pyam`. Replacements from :data:`REPLACE_DIMS` and :data:`REPLACE_VARS` are applied. The dimensions listed in the `var` arguments are automatically dropped from the returned :class:`pyam.IamDataFrame`. If ``var[0]`` contains the word "emissions", then :meth:`collapse_gwp_info` is invoked. Adapted from :func:`genno.compat.pyam.collapse`. Parameters ---------- var : list of str, optional Strings or dimensions to concatenate to the 'Variable' column. The first of these is usually a string value used to populate the column. These are joined using the pipe ('|') character. See also -------- REPLACE_DIMS REPLACE_VARS collapse_gwp_info test_collapse """ # Convert some dimension labels to title-case strings for dim in filter(lambda d: d in df.columns, "clt"): df[dim] = df[dim].astype(str).str.title() if "l" in df.columns: # Level: to title case, add the word 'energy' df["l"] = df["l"] + " Energy" if len(var) and "emissions" in var[0].lower(): log.info(f"Collapse GWP info for {var[0]}") df, var = collapse_gwp_info(df, var) # - Apply replacements to individual dimensions. # - Use the genno built-in to assemble the variable column. # - Apply replacements to assembled columns. return ( df.replace(REPLACE_DIMS, regex=True) .pipe(genno_collapse, columns=dict(variable=var)) .replace(dict(variable=REPLACE_VARS), regex=True) )
[docs] def collapse_gwp_info(df, var): """:meth:`collapse` helper for emissions data with GWP dimensions. The dimensions 'e equivalent', and 'gwp metric' dimensions are combined with the 'e' dimension, using a format like:: '{e} ({e equivalent}-equivalent, {GWP metric} metric)' For example:: 'SF6 (CO2-equivalent, AR5 metric)' """ # Check that *df* contains the necessary columns cols = ["e equivalent", "gwp metric"] missing = set(["e"] + cols) - set(df.columns) if len(missing): log.warning(f"…skip; {missing} not in columns {list(df.columns)}") return df, var # Format the column with original emissions species df["e"] = ( df["e"] + " (" + df["e equivalent"] + "-equivalent, " + df["gwp metric"] + " metric)" ) # Remove columns from further processing [var.remove(c) for c in cols] return df.drop(cols, axis=1), var
[docs] def copy_ts(rep: Reporter, other: str, filters: Optional[dict]) -> Key: """Prepare `rep` to copy time series data from `other` to `scenario`. Parameters ---------- other_url : str URL of the other scenario from which to copy time series data. filters : dict, optional Filters; passed via :func:`.store_ts` to :meth:`ixmp.TimeSeries.timeseries`. Returns ------- str Key for the copy operation. """ # A unique ID for this copy operation, to avoid collision if copy_ts() used multiple # times _id = f"{hash(other + repr(filters)):x}" k1 = rep.add("from_url", f"scenario {_id}", quote(other)) k2 = rep.add("get_ts", f"ts data {_id}", k1, filters) return single_key(rep.add("store_ts", f"copy ts {_id}", "scenario", k2))
[docs] def add_replacements(dim: str, codes: Iterable[Code]) -> None: """Update :data:`REPLACE_DIMS` for dimension `dim` with values from `codes`.""" for code in codes: try: label = str(code.get_annotation(id="report").text) except KeyError: pass else: REPLACE_DIMS[dim][f"{code.id.title()}$"] = label