Source code for message_ix_models.tools.iamc

"""Tools for working with IAMC-structured data."""

from collections.abc import MutableMapping
from typing import TYPE_CHECKING, Any, Literal, Optional

import genno
import pandas as pd
from sdmx.message import StructureMessage
from sdmx.model import common, v21

from message_ix_models.util import cached
from message_ix_models.util.pycountry import iso_3166_alpha_3

if TYPE_CHECKING:
    import pathlib

    from genno.types import AnyQuantity

__all__ = [
    "describe",
    "iamc_like_data_for_query",
]


[docs] def describe(data: pd.DataFrame, extra: Optional[str] = None) -> StructureMessage: """Generate SDMX structure information from `data` in IAMC format. Parameters ---------- data : Data in "wide" or "long" IAMC format. extra : str, optional Extra text added to the description of each Codelist. Returns ------- sdmx.message.StructureMessage The message contains one :class:`.Codelist` for each of the MODEL, SCENARIO, REGION, VARIABLE, and UNIT dimensions. Codes for the VARIABLE code list have annotations with :py:`id="preferred-unit-measure"` that give the corresponding UNIT Code(s) that appear with each VARIABLE. """ sm = StructureMessage() def _cl(dim: str) -> common.Codelist: result: common.Codelist = common.Codelist( id=dim, description=f"Codes appearing in the {dim!r} dimension of " + (extra or "data") + ".", is_final=True, is_external_reference=False, ) sm.add(result) return result for dim in ("MODEL", "SCENARIO", "REGION"): cl = _cl(dim) for value in sorted(data[dim].unique()): cl.append(common.Code(id=value)) # Handle "VARIABLE" and "UNIT" jointly dims = ["VARIABLE", "UNIT"] cl_variable = _cl("VARIABLE") cl_unit = _cl("UNIT") for variable, group_data in ( data[dims].sort_values(dims).drop_duplicates().groupby("VARIABLE") ): group_units = group_data["UNIT"].unique() cl_variable.append( common.Code( id=variable, annotations=[ v21.Annotation( id="preferred-unit-measure", text=", ".join(group_units) ) ], ) ) for unit in group_units: try: cl_unit.append(common.Code(id=unit)) except ValueError: pass return sm
def _assign_n(df: pd.DataFrame, *, missing: Literal["keep", "discard"]) -> pd.DataFrame: if missing == "discard": return df.assign(n=df["REGION"].apply(iso_3166_alpha_3)) else: return df.assign(n=df["REGION"].apply(lambda v: iso_3166_alpha_3(v) or v)) def _drop_unique( df: pd.DataFrame, *, columns: str, record: MutableMapping[str, Any] ) -> pd.DataFrame: """Drop `columns` so long as they each contain a single, unique value.""" _columns = columns.split() for column in _columns: values = df[column].unique() if len(values) > 1: raise RuntimeError(f"Not unique {column!r}: {values}") record[column] = values[0] return df.drop(_columns, axis=1) def _raise_empty(df: pd.DataFrame, *, query: str) -> pd.DataFrame: if len(df) == 0: raise RuntimeError(f"0 rows matching {query!r}") return df
[docs] @cached def iamc_like_data_for_query( path: "pathlib.Path", query: str, *, archive_member: Optional[str] = None, drop: Optional[list[str]] = None, non_iso_3166: Literal["keep", "discard"] = "discard", replace: Optional[dict] = None, unique: str = "MODEL SCENARIO VARIABLE UNIT", **kwargs, ) -> "AnyQuantity": """Load data from `path` in an IAMC-like format and transform to :class:`.Quantity`. The steps involved are: 1. Read the data file. Additional `kwargs` are passed to :func:`pandas.read_csv`. By default (unless `kwargs` explicitly give a different value), pyarrow is used for better performance. 2. Pass the result through :func:`to_quantity`, with the parameters `query`, `drop`, `non_iso_3166`, `replace`, and `unique`. 3. Cache the result using :obj:`.cached`. Subsequent calls with the same arguments will yield the cached result rather than repeating steps (1) and (2). Parameters ---------- archive_member : bool, optional If given, `path` may be an archive with 2 or more members. The member named by `archive_member` is extracted and read. Returns ------- genno.Quantity of the same structure returned by :func:`to_quantity`. """ import pandas as pd # Identify the source object/buffer to read from if archive_member: # A single member in a ZIP archive that has >1 members import zipfile zf = zipfile.ZipFile(path) source: Any = zf.open(archive_member) else: # A direct path, possibly compressed source = path kwargs.setdefault("engine", "pyarrow") return to_quantity( pd.read_csv(source, **kwargs), query=query, drop=drop, non_iso_3166=non_iso_3166, replace=replace, unique=unique, )
def to_quantity( data: "pd.DataFrame", *, query: str, drop: Optional[list[str]] = None, non_iso_3166: Literal["keep", "discard"] = "discard", replace: Optional[dict] = None, unique: str = "MODEL SCENARIO VARIABLE UNIT", ) -> "AnyQuantity": """Convert `data` in IAMC ‘wide’ structure to :class:`genno.Quantity`. `data` is processed via the following steps: 1. Drop columns given in `drop`, if any. 2. Apply `query`. This is done early to reduce the data handled in subsequent steps. The query string must use the original column names (with matching case) as appearing in `data` (or, for :func:`iamc_like_data_for_query`, in the file at `path`). 3. Apply replacements from `replace`, if any. 4. Drop columns that are entirely empty. 5. Rename all columns/dimensions to upper case. 6. Assert that the `unique` columns each contain exactly 1 unique value, then drop these columns. This means that `query` **must** result in data with unique values for these dimensions. 7. Transform "REGION" codes via :func:`.iso_3166_alpha_3` to an "n" dimension containing ISO 3166-1 alpha-3 codes. If `non_iso_3166`, preserve codes that do not appear in the standard. 8. Drop entire time series where (7) does not yield an "n" code. 9. Transform to :class:`pandas.Series` with "n" and "y" index levels; ensure the latter are :class:`int`. 10. Transform to :class:`.Quantity` and attach units. Parameters ---------- data : Data frame in IAMC ‘wide’ format. The column names "Model", "Scenario", "Region", "Variable", and "Unit" may be in any case. query : Query to select a subset of data, passed to :meth:`pandas.DataFrame.query`. drop : Identifiers of columns in `data`, passed to :meth:`pandas.DataFrame.drop`. non_iso_3166 : If "discard" (default), "region" labels that are not ISO 3166-1 country names are discarded, along with associated data. If "keep", such labels are kept. replace : Replacements for values in columns, passed to :meth:`pandas.DataFrame.replace`. unique : Columns which must contain unique values. These columns are dropped from the result. Returns ------- genno.Quantity with at least dimensions :py:`("n", "y")`, and then a subset of :py:`("MODEL", "SCENARIO", "VARIABLE", "UNIT")`—only those dimensions *not* indicated by `unique`. If "UNIT" is in `unique`, the quantity has the given, unique units; otherwise, it is dimensionless. """ set_index = ["n"] + sorted( set(["MODEL", "SCENARIO", "VARIABLE", "UNIT"]) - set(unique.split()) ) unique_values: dict[str, Any] = dict() tmp = ( data.drop(columns=drop or []) .query(query) .replace(replace or {}) .dropna(how="all", axis=1) .rename(columns=lambda c: c.upper()) .pipe(_raise_empty, query=query) .pipe(_drop_unique, columns=unique, record=unique_values) .pipe(_assign_n, missing=non_iso_3166) .dropna(subset=["n"]) .drop("REGION", axis=1) .set_index(set_index) .rename(columns=lambda y: int(y)) .rename_axis(columns="y") .stack() .dropna() ) return genno.Quantity( tmp, units=unique_values["UNIT"] if "UNIT" in unique else "dimensionless" )