Source code for message_ix_models.report.util

import logging
from collections.abc import Iterable
from dataclasses import dataclass, field
from itertools import count

import pandas as pd
from dask.core import quote
from genno import Computer, Key, Keys
from genno.compat.pyam.util import collapse as genno_collapse
from genno.core.key import single_key
from message_ix import Reporter
from sdmx.model.common import Code

log = logging.getLogger(__name__)


#: Replacements used in :meth:`collapse`.
#: These are applied using :meth:`pandas.DataFrame.replace` with ``regex=True``; see the
#: documentation of that method.
#:
#: - Applied to whole strings along each dimension.
#: - These columns have :meth:`str.title` applied before these replacements.
#:
#: See also :func:`add_replacements`.
REPLACE_DIMS: dict[str, dict[str, str]] = {
    "c": {
        # in land_out, for CH4 emissions from GLOBIOM
        "Agri_Ch4": "GLOBIOM|Emissions|CH4 Emissions Total",
    },
    "l": {
        # FIXME this is probably not generally applicable and should be removed
        "Final Energy": "Final Energy|Residential",
    },
    "t": dict(),
}

#: Replacements used in :func:`collapse` after 'variable' labels are constructed. These
#: are applied using :meth:`pandas.DataFrame.replace` with ``regex=True``; see the
#: documentation of that method. For documentation of regular expressions, see
#: https://docs.python.org/3/library/re.html and https://regex101.com.
#:
#: .. todo:: These may be particular or idiosyncratic to a single 'template'. The
#:    strings used to collapse multiple conceptual dimensions into the IAMC 'variable'
#:    dimension are known to vary across these templates, in ways that are sometimes not
#:    documented.
#:
#:    This setting is currently applied universally. To improve, specify a different
#:    mapping with the replacements needed for each individual template, and load the
#:    correct one when reporting scenarios to that template.
REPLACE_VARS = {
    # Secondary energy: remove duplicate "Solids"
    r"(Secondary Energy\|Solids)\|Solids": r"\1",
    # CH4 emissions from MESSAGE technologies
    r"(Emissions\|CH4)\|Fugitive": r"\1|Energy|Supply|Fugitive",
    # CH4 emissions from GLOBIOM
    r"(Emissions\|CH4)\|((Gases|Liquids|Solids|Elec|Heat)(.*))": (
        r"\1|Energy|Supply|\3|Fugitive\4"
    ),
    r"^(land_out CH4.*\|)Awm": r"\1Manure Management",
    r"^land_out CH4\|Emissions\|Ch4\|Land Use\|Agriculture\|": (
        "Emissions|CH4|AFOLU|Agriculture|Livestock|"
    ),
    # Strip internal prefix
    r"^land_out CH4\|": "",
    # Prices
    r"Residential\|(Biomass|Coal)": r"Residential|Solids|\1",
    r"Residential\|Gas": "Residential|Gases|Natural Gas",
    r"Import Energy\|Lng": "Primary Energy|Gas",
    r"Import Energy\|Coal": "Primary Energy|Coal",
    r"Import Energy\|Oil": "Primary Energy|Oil",
    r"Import Energy\|(Liquids\|(Biomass|Oil))": r"Secondary Energy|\1",
    r"Import Energy\|Lh2": "Secondary Energy|Hydrogen",
}


_RENAME = {"n": "region", "nl": "region", "y": "year", "ya": "year", "yv": "year"}


[docs] @dataclass class IAMCConversion: """Description of a conversion to IAMC data structure. Instance fields contain information needed to prepare the conversion. :meth:`add_tasks` adds tasks to a :class:`.Computer` to perform it. """ #: Key for data to be converted. base: Key #: Parts of the variable expression. This is passed as the :py:`var` argument to #: :func:`collapse`. var_parts: list[str] #: Exact unit string for output. unit: str #: Dimension(s) to sum over. sums: list[str] = field(default_factory=list) #: If :any:`True`, ensure data is present for ``R##_GLB``. GLB_zeros: bool = False def __post_init__(self) -> None: # Ensure base is a Key self.base = Key(self.base)
[docs] def add_tasks(self, c: "Computer") -> None: """Add tasks to convert :attr:`base` to IAMC structure. The tasks include, in order: 1. If :attr:`GLB_zeroes` is :any:`True`: - Create a quantity with the same shape as :attr:`base`, filled with all zeros (:func:`.zeros_like`) and a single coord like ``R##_GLB`` for the :math:`n` dimension (:func:`.node_glb`). - Add this to :attr:`base`. These steps ensure that values for ``R##_GLB`` will appear in the IAMC-structured result. 2. Convert to the given :attr:`units` (:func:`~genno.operator.convert_units`). The :attr:`base` quantity **must** have dimensionally compatible units. Steps (3) to (6) are repeated for (at least) an empty string (:py:`""`) and for any expressions like :py:`"x-y-z"` in :attr:`sums`. 3. Subtract the given dimension(s) (if any) from the dimensions of :attr:`base`. For example, if :attr:`base` is ``<foo:x-y-z>`` and :attr:`sums` includes :py:`"x-z"`, this gives a reference to ``<foo:y>``, which is the base quantity summed over the :math:`(x, z)` dimensions. 4. Reduce the :attr:`var_parts` in the same way. For example, if :attr:`var_parts` is :py:`["Variable prefix", "z", "x", "y", "Foo"]`, the above sum reduces this to :py:`["Variable prefix", "y", "Foo"]`. 5. Call :func:`genno.compat.pyam.iamc` to add further tasks to convert the quantity from (3) to IAMC structure. :func:`callback` in this module is used to help format the individual dimension labels and collapsed ‘variable’ labels. This step results in keys like ``base 0::iamc``, ``base 1::iamc``, etc. added to `rep`. 6. Append the key from (5) to the task at :data:`.report.key.all_iamc`. This ensures that the converted data is concatenated with all other IAMC-structured data. """ from genno.compat.pyam import iamc as handle_iamc from .key import all_iamc, coords k = Keys(base=self.base, glb=self.base + "glb") if self.GLB_zeros: # Quantity of zeros in the same shape as self.base, without an 'n' dimension c.add(k.glb[0], "zeros_like", self.base, drop=["n"]) # Add the 'n' dimension c.add(k.glb[1], "expand_dims", k.glb[0], coords.n_glb) # Add zeros to base data & update the base key for next steps c.add(k.base[0], "add", self.base, k.glb[1]) else: # Simple alias c.add(k.base[0], k.base) # Convert to target units c.add(k.base[1], "convert_units", k.base[0], units=self.unit, sums=True) # Common keyword arguments for genno.compat.pyam.iamc args: dict = dict(rename=_RENAME, unit=self.unit) # Identify a `start` value that does not duplicate existing keys label = self.var_parts[0] for start in count(): if f"{label} {start}::iamc" not in c: break # Iterate over dimensions to be partly summed # TODO move some or all of this logic upstream keys = [] for i, dims in enumerate( map(lambda s: s.split("-"), [""] + self.sums), start=start ): # Parts (string literals or dimension IDs) to concatenate into ‘variable’. # Exclude any summed dimensions from the expression. var_parts = [v for v in self.var_parts if v not in dims] # Invoke genno's built-in handler to add more tasks: # - Base key: the partial sum of k.base over any `dims`. # - "variable" argument is used only to construct keys; the resulting IAMC- # structured data is available at `{variable}::iamc`. # - Collapse using `var_parts` and the collapse() function in this module. handle_iamc( c, args | dict( base=k.base[1].drop(*dims), variable=f"{label} {i}", collapse=dict(callback=collapse, var=var_parts), ), ) keys.append(f"{label} {i}::iamc") # Concatenate each of `keys` into all::iamc c.graph[all_iamc] += tuple(keys)
[docs] def collapse(df: pd.DataFrame, var=[]) -> pd.DataFrame: """Callback for the `collapse` argument to :meth:`~.Reporter.convert_pyam`. Replacements from :data:`REPLACE_DIMS` and :data:`REPLACE_VARS` are applied. The dimensions listed in the `var` argument are automatically dropped from the returned :class:`pyam.IamDataFrame`. If :py:`var[0]` contains the word "emissions", then :func:`collapse_gwp_info` is invoked. Adapted from :func:`genno.compat.pyam.collapse`. Parameters ---------- var : list of str, optional Strings or dimensions to concatenate to a 'variable' string. The first of these usually a :class:`str` used to populate the column; others may be fixed strings or the IDs of dimensions in the input data. The components are joined using the pipe ('|') character. See also -------- REPLACE_DIMS REPLACE_VARS collapse_gwp_info test_collapse """ # Convert some dimension labels to title-case strings for dim in filter(lambda d: d in df.columns, "clt"): df[dim] = df[dim].astype(str).str.title() if "l" in df.columns: # Level: to title case, add the word 'energy' df["l"] = df["l"] + " Energy" if len(var) and "emissions" in var[0].lower(): log.info(f"Collapse GWP info for {var[0]}") df, var = collapse_gwp_info(df, var) # - Apply replacements to individual dimensions. # - Use the genno built-in to assemble the variable column. # - Apply replacements to assembled columns. return ( df.replace(REPLACE_DIMS, regex=True) .pipe(genno_collapse, columns=dict(variable=var)) .replace(dict(variable=REPLACE_VARS), regex=True) )
[docs] def collapse_gwp_info(df, var): """:meth:`collapse` helper for emissions data with GWP dimensions. The dimensions 'e equivalent', and 'gwp metric' dimensions are combined with the 'e' dimension, using a format like:: '{e} ({e equivalent}-equivalent, {GWP metric} metric)' For example:: 'SF6 (CO2-equivalent, AR5 metric)' """ # Check that *df* contains the necessary columns cols = ["e equivalent", "gwp metric"] missing = set(["e"] + cols) - set(df.columns) if len(missing): log.warning(f"…skip; {missing} not in columns {list(df.columns)}") return df, var # Format the column with original emissions species df["e"] = ( df["e"] + " (" + df["e equivalent"] + "-equivalent, " + df["gwp metric"] + " metric)" ) # Remove columns from further processing [var.remove(c) for c in cols] return df.drop(cols, axis=1), var
[docs] def copy_ts(rep: Reporter, other: str, filters: dict | None) -> Key: """Prepare `rep` to copy time series data from `other` to `scenario`. Parameters ---------- other_url : str URL of the other scenario from which to copy time series data. filters : dict, optional Filters; passed via :func:`.store_ts` to :meth:`ixmp.TimeSeries.timeseries`. Returns ------- str Key for the copy operation. """ # A unique ID for this copy operation, to avoid collision if copy_ts() used multiple # times _id = f"{hash(other + repr(filters)):x}" k1 = rep.add("from_url", f"scenario {_id}", quote(other)) k2 = rep.add("get_ts", f"ts data {_id}", k1, filters) return single_key(rep.add("store_ts", f"copy ts {_id}", "scenario", k2))
[docs] def add_replacements(dim: str, codes: Iterable[Code]) -> None: """Update :data:`REPLACE_DIMS` for dimension `dim` with values from `codes`. For every code in `codes` that has an annotation with the ID ``report``, the code ID is mapped to the value of the annotation. For example, the following in one of the :doc:`/pkg-data/codelists`: .. code-block:: yaml foo: report: fOO bar: report: Baz qux: {} # No "report" annotation → no mapping …results in entries :py:`{"Foo": "fOO", "Bar": "Baz"}` added to :data:`REPLACE_DIMS` and used by :func:`collapse`. """ for code in codes: # List of candidates candidates = [code.id, code.name] try: # Append the value of the "report" annotation, if any candidates.append(code.get_annotation(id="report").text) except KeyError: pass # Final entry in the list with a non-empty string representation label = next(filter(None, map(str, reversed(candidates)))) if label != code.id: REPLACE_DIMS[dim][f"{code.id.title()}$"] = label
# FIXME Type as "Computer" str alias, when supported by genno.Computer.apply()
[docs] def store_write_ts(c: Computer, base_key: Key) -> Key: """Add tasks to store and write files with time-series data from `base_key`. `base_key` **should** refer to a task that returns time-series data in the IAMC structure; that is, the format used by :meth:`ixmp.TimeSeries.add_timeseries`. If `base_key` is for instance "foo::iamc", this function adds the following keys: "foo::iamc+all" Both of: "foo::iamc+file" Both of: "foo::iamc+csv" Write data in `base_key` to a file named :file:`foo.csv` in CSV format, wherein the file stem is the same as the :attr:`.Key.name` of `base_key`. "foo::iamc+xlsx" Write the data in `base_key` to a file named :file:`foo.xlsx` in Excel format. The two files are created in a subdirectory created with :func:`make_output_path`, including a path component constructed from the scenario URL. "foo::iamc+store" Store the data in `base_key` as time series data on the :class:`.Scenario` identified by the key "scenario", using :func:`ixmp.report.operator.store_ts`. Other code **may** then :meth:`~.Reporter.get` one of the 3 keys, as needed, to perform some or all of these tasks. Returns ------- Key the "+all" key described above. Example ------- >>> rep = Reporter(...) >>> result = rep.apply(store_write_ts, "foo::iamc") >>> result <foo::iamc+all> """ k = Key(base_key) file_keys = [] for suffix in ("csv", "xlsx"): # Create the path name = f"{k.name}.{suffix}" path = c.add(k[f"{suffix} path"], "make_output_path", "config", name=name) # Write `key` to the path file_keys.append(c.add(k[suffix], "write_report", base_key, path)) # Write all files c.add(k["file"], "summarize", *file_keys) # Store data on "scenario" c.add(k["store"], "store_ts", "scenario", base_key) # Both write and store c.add(k["all"], "summarize", k["store"], *file_keys) return k["all"]