"""Generic tools for working with exogenous data sources."""
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping
from copy import deepcopy
from operator import itemgetter
from pathlib import Path
from typing import Any, Literal, Optional
from genno import Computer, Key, Quantity, quote
from message_ix_models import ScenarioInfo
from message_ix_models.model.structure import get_codes
from message_ix_models.util import cached
__all__ = [
"MEASURES",
"SOURCES",
"DemoSource",
"ExoDataSource",
"iamc_like_data_for_query",
"prepare_computer",
"register_source",
]
log = logging.getLogger(__name__)
#: Supported measures. Subclasses of :class:`.ExoDataSource` may provide support for
#: other measures.
#:
#: .. todo:: Store this in a separate code list or concept scheme.
MEASURES = ("GDP", "POP")
#: Known sources for data. Use :func:`register_source` to add to this collection.
SOURCES: dict[str, type["ExoDataSource"]] = {}
[docs]class ExoDataSource(ABC):
"""Base class for sources of exogenous data."""
#: Identifier for this particular source.
id: str = ""
#: Key for the returned :class:`.Quantity`. Optional. See :meth:`get_keys`.
key: Optional[Key] = None
#: Name for the returned :class:`.Key`/:class:`.Quantity`. Optional. See
# :meth:`get_keys`.
name: str = ""
#: Optional additional dimensions for the returned :class:`.Key`/:class:`.Quantity`.
#: If not set by :meth:`.__init__`, the dimensions are :math:`(n, y)`.
extra_dims: tuple[str, ...] = ()
#: :any:`True` if :meth:`.transform` should aggregate data on the |n| dimension.
aggregate: bool = True
#: :any:`True` if :meth:`.transform` should interpolate data on the |y| dimension.
interpolate: bool = True
[docs] @abstractmethod
def __init__(self, source: str, source_kw: Mapping) -> None:
"""Handle `source` and `source_kw`.
An implementation **must**:
- Raise :class:`ValueError` if it does not recognize or cannot handle the
arguments in `source` or `source_kw`.
- Recognize and handle (if possible) a "measure" keyword in `source_kw` from
:data:`MEASURES`.
It **may**:
- Transform these into other values, for instance by mapping certain values to
others, applying regular expressions, or other operations.
- Store those values as instance attributes for use in :meth:`__call__`.
- Set :attr:`name` and/or :attr:`extra_dims` to control the behaviour of
:func:`.prepare_computer`.
- Log messages that give information that may help to debug a
:class:`ValueError` for `source` or `source_kw` that cannot be handled.
It **should not** actually load data or perform any time- or memory-intensive
operations; these should only be triggered by :meth:`.__call__`.
"""
raise ValueError
[docs] @abstractmethod
def __call__(self) -> Quantity:
r"""Return the data.
The Quantity returned by this method **must** have dimensions
:math:`(n, y) \cup \text{extra_dims}`. If the original/upstream/raw data has
different dimensionality (fewer or more dimensions; different dimension IDs),
the code **must** transform these, make appropriate selections, etc.
"""
raise NotImplementedError
[docs] def get_keys(self) -> tuple[Key, Key]:
"""Return the target keys for the (1) raw and (2) transformed data.
Subclasses **may** override this method to provide different targets keys. In
the default implementation, the key for the transformed data is:
1. :attr:`.key`, if any, or
2. Constructed from:
- :attr:`.name` or :attr:`.measure` in lower-case.
- The dimensions :math:`(n, y)`, plus any :attr:`.extra_dims`.
The key for the raw data is the same, with :attr`.id` as an extra tag.
"""
k = self.key or Key(
self.name or self.measure.lower(), ("n", "y") + self.extra_dims
)
return k + self.id, k
[docs]def prepare_computer(
context,
c: "Computer",
source="test",
source_kw: Optional[Mapping] = None,
*,
strict: bool = True,
) -> tuple[Key, ...]:
"""Prepare `c` to compute GDP, population, or other exogenous data.
Check each :class:`ExoDataSource` in :data:`SOURCES` to determine whether it
recognizes and can handle `source` and `source_kw`. If a source is identified, add
tasks to `c` that retrieve and process data into a :class:`.Quantity` with, at
least, dimensions :math:`(n, y)`.
Parameters
----------
source : str
Identifier of the source, possibly with other information to be handled by a
:class:`ExoDataSource`.
source_kw : dict, optional
Keyword arguments for a Source class. These can include indexers, selectors, or
other information needed by the source class to identify the data to be
returned.
If the key "measure" is present, it **should** be one of :data:`MEASURES`.
strict : bool, optional
Raise an exception if any of the keys to be added already exist.
Returns
-------
tuple of .Key
Raises
------
ValueError
if no source is registered which can handle `source` and `source_kw`.
"""
# Handle arguments
source_kw = source_kw or dict()
if measure := source_kw.get("measure"):
# commented: quiet logging; MEASURES is not automatically updated to include the
# IDs of measures recognized by additional SOURCES
# TODO Remove use of this variable, below, by ensuring that source_obj.name is
# always present
# if measure not in MEASURES:
# log.debug(f"source keyword {measure = } not in recognized {MEASURES}")
del measure
# Look up input data flow
source_obj = None
for cls in SOURCES.values():
try:
# Instantiate a Source object to provide this data
source_obj = cls(source, deepcopy(source_kw or dict()))
# except Exception as e: # For debugging
# log.debug(repr(e))
except Exception:
pass # Class does not recognize the arguments
if source_obj is None:
raise ValueError(f"No source found that can handle {source!r}")
# Add structural information to the Computer
c.require_compat("message_ix_models.report.operator")
c.graph.setdefault("context", context)
# Retrieve the node codelist
c.add("n::codes", quote(get_codes(f"node/{context.model.regions}")), strict=strict)
# Convert the codelist into a nested dict for aggregate()
c.add("n::groups", "codelist_to_groups", "n::codes", strict=strict)
# Add information about the list of periods
if "y" not in c:
info = ScenarioInfo()
info.year_from_codes(get_codes(f"year/{context.model.years}"))
c.add("y", quote(info.Y))
if "y0" not in c:
c.add("y0", itemgetter(0), "y")
# Above as coords/indexers
c.add("y::coords", lambda years: dict(y=years), "y")
c.add("y0::coord", lambda year: dict(y=year), "y0")
# Retrieve the keys that will refer to the raw and transformed data
k_raw, k = source_obj.get_keys()
# Keys to return
keys = [k]
# Retrieve the raw data by invoking ExoDataSource.__call__
c.add(k_raw, source_obj)
# Allow the class to add further tasks that transform the data. See
# ExoDataSource.transform() for the default: aggregate, then interpolate.
key = source_obj.transform(c, k_raw)
# Alias `key` -> `k`
c.add(k, key)
# Index to y0
k_y0 = k + "y0_indexed"
c.add(k_y0, "index_to", k, "y0::coord")
keys.append(k_y0)
# TODO Index to a particular label on the "n" dimension
# TODO Index on both "n" and "y"
return tuple(keys)
[docs]def register_source(cls: type[ExoDataSource]) -> type[ExoDataSource]:
"""Register :class:`.ExoDataSource` `cls` as a source of exogenous data."""
if cls.id in SOURCES:
raise ValueError(f"{SOURCES[cls.id]} already registered for id {cls.id!r}")
SOURCES[cls.id] = cls
return cls
[docs]@register_source
class DemoSource(ExoDataSource):
"""Example source of exogenous population and GDP data.
Parameters
----------
source : str
**Must** be like ``test s1``, where "s1" is a scenario ID from ("s0"…"s4").
source_kw : dict
**Must** contain an element "measure", one of :data:`MEASURES`.
"""
id = "DEMO"
def __init__(self, source, source_kw):
if not source.startswith("test "):
# Don't recognize this `source` string → can't provide data
raise ValueError
# Select the data according to the `source`; in this case, scenario
*parts, scenario = source.partition("test ")
self.indexers = dict(s=scenario)
# Map from the measure ID to a variable name
self.measure = source_kw["measure"]
self.indexers.update(v={"POP": "Population", "GDP": "GDP"}[self.measure])
def __call__(self) -> Quantity:
from genno.operator import select
# - Retrieve the data.
# - Apply the prepared indexers.
return self.random_data().pipe(select, self.indexers, drop=True)
[docs] @staticmethod
def random_data():
"""Generate some random data with n, y, s, and v dimensions."""
from genno.operator import relabel
from genno.testing import random_qty
from pycountry import countries
return random_qty(dict(n=len(countries), y=2, s=5, v=2), units="kg").pipe(
relabel,
n={f"n{i}": c.alpha_3 for i, c in enumerate(countries)},
v={"v0": "Population", "v1": "GDP"},
y={"y0": 2010, "y1": 2050},
)
[docs]@cached
def iamc_like_data_for_query(
path: Path,
query: str,
*,
archive_member: Optional[str] = None,
drop: Optional[list[str]] = None,
non_iso_3166: Literal["keep", "discard"] = "discard",
replace: Optional[dict] = None,
unique: str = "MODEL SCENARIO VARIABLE UNIT",
**kwargs,
) -> Quantity:
"""Load data from `path` in IAMC-like format and transform to :class:`.Quantity`.
The steps involved are:
1. Read the data file; use pyarrow for better performance.
2. Immediately apply `query` to reduce the data to be handled in subsequent steps.
3. Assert that Model, Scenario, Variable, and Unit are unique; store the unique
values. This means that `query` **must** result in data with unique values for
these dimensions.
4. Transform "Region" labels to ISO 3166-1 alpha-3 codes using
:func:`.iso_3166_alpha_3`.
5. Drop entire time series without such codes; for instance "World".
6. Transform to a pd.Series with "n" and "y" index levels; ensure the latter are
int.
7. Transform to :class:`.Quantity` with units.
The result is :obj:`.cached`.
Parameters
----------
archive_member : bool, optional
If given, `path` may be an archive with 2 or more members. The member named by
`archive_member` is extracted and read.
non_iso_3166 : bool, optional
If "discard" (default), "region" labels that are not ISO 3166-1 country names
are discarded, along with associated data. If "keep", such labels are kept.
"""
import pandas as pd
from message_ix_models.util.pycountry import iso_3166_alpha_3
unique_values = dict()
def drop_unique(df, names) -> pd.DataFrame:
if len(df) == 0:
raise RuntimeError(f"0 rows matching {query!r}")
names_list = names.split()
for name in names_list:
values = df[name].unique()
if len(values) > 1:
raise RuntimeError(f"Not unique {name!r}: {values}")
unique_values[name] = values[0]
return df.drop(names_list, axis=1)
def assign_n(df: pd.DataFrame) -> pd.DataFrame:
if non_iso_3166 == "discard":
return df.assign(n=df["REGION"].apply(iso_3166_alpha_3))
else:
return df.assign(n=df["REGION"].apply(lambda v: iso_3166_alpha_3(v) or v))
# Identify the source object/buffer to read from
if archive_member:
# A single member in a ZIP archive that has >1 members
import zipfile
zf = zipfile.ZipFile(path)
source: Any = zf.open(archive_member)
else:
# A direct path, possibly compressed
source = path
kwargs.setdefault("engine", "pyarrow")
set_index = ["n"] + sorted(
set(["MODEL", "SCENARIO", "VARIABLE", "UNIT"]) - set(unique.split())
)
tmp = (
pd.read_csv(source, **kwargs)
.drop(columns=drop or [])
.query(query)
.replace(replace or {})
.dropna(how="all", axis=1)
.rename(columns=lambda c: c.upper())
.pipe(drop_unique, unique)
.pipe(assign_n)
.dropna(subset=["n"])
.drop("REGION", axis=1)
.set_index(set_index)
.rename(columns=lambda y: int(y))
.rename_axis(columns="y")
.stack()
.dropna()
)
return Quantity(tmp, units=unique_values["UNIT"])