import logging
from abc import abstractmethod
from collections.abc import Mapping, Sequence
from functools import cache
from importlib.util import find_spec
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, cast
from warnings import warn
import pandas as pd
from genno import Quantity
from genno.operator import concat
from ._logging import once
if TYPE_CHECKING:
from genno.types import TQuantity
from .context import Context
log = logging.getLogger(__name__)
#: :any:`True` if :mod:`message_data` is installed.
HAS_MESSAGE_DATA = False
#: Root directory of the :mod:`message_data` repository. This package is always
#: installed from source.
MESSAGE_DATA_PATH: Optional[Path] = None
if _spec := find_spec("message_data"): # pragma: no cover
HAS_MESSAGE_DATA = True
assert _spec.origin is not None
MESSAGE_DATA_PATH = Path(_spec.origin).parents[1]
#: Directory containing message_ix_models.__init__.
MESSAGE_MODELS_PATH = Path(__file__).parents[1]
#: Package data already loaded with :func:`load_package_data`.
PACKAGE_DATA: dict[str, Any] = dict()
#: Data already loaded with :func:`load_private_data`.
PRIVATE_DATA: dict[str, Any] = dict()
[docs]
class Adapter:
"""Adapt `data`.
Adapter is an abstract base class for tools that adapt data in any way, e.g.
between different code lists for certain dimensions. An instance of an Adapter can
be called with any of the following as `data`:
- :class:`genno.Quantity`,
- :class:`pandas.DataFrame`, or
- :class:`dict` mapping :class:`str` parameter names to values (either of the above
types).
…and will return data of the same type.
Subclasses can implement different adapter logic by overriding the abstract
:meth:`adapt` method.
"""
def __call__(self, data):
if isinstance(data, Quantity):
return self.adapt(data)
elif isinstance(data, pd.DataFrame):
# Convert to Quantity
idx_cols = list(filter(lambda c: c not in ("value", "unit"), data.columns))
qty = Quantity.from_series(data.set_index(idx_cols)["value"])
# Store units
if "unit" in data.columns:
units = data["unit"].unique()
assert 1 == len(units), f"Non-unique units {units}"
unit = units[0]
else:
unit = "" # dimensionless
# Adapt, convert back to pd.DataFrame, return
return self.adapt(qty).to_dataframe().assign(unit=unit).reset_index()
elif isinstance(data, Mapping):
return {par: self(value) for par, value in data.items()}
else:
raise TypeError(type(data))
[docs]
@abstractmethod
def adapt(self, qty: "TQuantity") -> "TQuantity":
"""Adapt data."""
[docs]
class MappingAdapter(Adapter):
"""Adapt data using mappings for 1 or more dimension(s).
Parameters
----------
maps : dict of sequence of tuple
Keys are names of dimensions. Values are sequences of 2-tuples; each tuple
consists of an original label and a target label.
on_missing :
If provided (default :any:`None`), perform the given action if `maps` do not
contain all the labels in the respective dimensions of each Quantity passed to
:meth:`adapt`:
- "log": log a message on level :data:`logging.WARNING`.
- "raise": raise :class:`RuntimeError`.
- "warn": emit :class:`RuntimeWarning`.
Examples
--------
>>> a = MappingAdapter({"foo": [("a", "x"), ("a", "y"), ("b", "z")]})
>>> df = pd.DataFrame(
... [["a", "m", 1], ["b", "n", 2]], columns=["foo", "bar", "value"]
... )
>>> a(df)
foo bar value
0 x m 1
1 y m 1
2 z n 2
"""
maps: Mapping
on_missing: Optional[Literal["log", "raise", "warn"]]
def __init__(
self,
maps: Mapping[str, Sequence[tuple[str, str]]],
*,
on_missing: Optional[Literal["log", "raise", "warn"]] = None,
) -> None:
self.maps = maps
self.on_missing = on_missing
[docs]
@classmethod
def from_dicts(
cls,
*values: dict[str, Sequence[str]],
dims: Sequence[str],
map_leaves: bool = True,
# Passed to __init__
on_missing: Optional[Literal["log", "raise", "warn"]],
) -> "MappingAdapter":
"""Construct a MappingAdapter from sequences of :class:`dict` and dimensions."""
maps: dict[str, list[tuple[str, str]]] = dict()
for dim, v in zip(dims, values):
maps[dim] = []
dim_all = set()
for group, labels in v.items():
maps[dim].extend((group, label) for label in labels)
dim_all |= set(labels)
if map_leaves:
maps[dim].extend((label, label) for label in dim_all)
return cls(maps, on_missing=on_missing)
[docs]
def adapt(self, qty: "TQuantity") -> "TQuantity":
result = qty
coords = qty.coords
for dim, labels in self.maps.items():
if dim not in qty.dims:
continue
dim_coords = set(coords[dim].data)
# Check for coords in the data that are missing from `maps`.
# Skip if on_missing is None.
if (
missing := (dim_coords - {a for (a, b) in labels})
if self.on_missing
else set()
):
msg = (
f"Original coords {dim}={missing} not mapped to any coords and "
f"{'would be' if self.on_missing == 'raise' else 'are'} dropped"
)
if self.on_missing == "log":
log.warning(msg)
elif self.on_missing == "raise":
raise RuntimeError(msg)
elif self.on_missing == "warn":
warn(msg, RuntimeWarning, stacklevel=2)
result = concat(
*[
result.sel({dim: a}, drop=True).expand_dims({dim: [b]})
for (a, b) in labels
if a in dim_coords # Skip `label` if not in `dim` of `qty`
]
)
return result
[docs]
class WildcardAdapter(Adapter):
"""Adapt data using by broadcasting wildcard ("*") entries for 1 dimension."""
dim: str
coords: set[str]
def __init__(self, dim: str, coords: Sequence[str]) -> None:
self.dim = dim
self.coords = set(coords)
@cache
def _coord_map(self, labels: tuple[str]) -> pd.DataFrame:
"""Cached helper returning a data frame with two columns:
- :py:`self.dim` with original coords along :attr:`dim`, including the wildcard
"*".
- "__new" with the full set of :attr:`coords`.
"""
_l = set(labels) - {"*"}
# Missing coords to be filled using wildcard
to_wildcard = self.coords - _l
# Mapping from existing labels to labels to appear in result
return pd.DataFrame(
[[c, c] for c in _l] + [["*", c] for c in to_wildcard],
columns=[self.dim, "__new"],
)
[docs]
def adapt(self, qty: "TQuantity") -> "TQuantity":
# Identify the dimensions to group on
groupby_dims = list(qty.dims) + ["__preserve"]
groupby_dims.remove(self.dim)
def wildcard_group(x: pd.DataFrame) -> pd.DataFrame:
"""Apply the wildcard operation to group data frame `x`."""
# Coordinates for `self.dim` appearing in the group
c_in_group = tuple(sorted(x[self.dim].unique()))
if "*" not in c_in_group:
# Nothing to wildcard in this group
result = x
else:
# - Retrieve a _coord_map for this group.
# - (Outer) merge with `x`.
# - Replace original `self.dim` column with "__new".
result = (
x.merge(self._coord_map(c_in_group), on=self.dim)
.drop(self.dim, axis=1)
.rename(columns={"__new": self.dim})
)
# Make `self.dim` an index level
return result.set_index(self.dim)
# - Convert to pd.DataFrame, reset index to columns.
# - Group on `groupby_dims`.
# - Apply wildcard_group().
# - Convert back to Quantity.
q_result = type(qty)(
qty.to_frame()
.reset_index()
.assign(__preserve="")
.groupby(groupby_dims)
.apply(wildcard_group, include_groups=False)
.droplevel("__preserve")
.iloc[:, 0]
)
return qty._keep(q_result, attrs=True, name=True, units=True)
def _load(
var: dict, base_path: Path, *parts: str, default_suffix: Optional[str] = None
) -> Any:
"""Helper for :func:`.load_package_data` and :func:`.load_private_data`."""
key = " ".join(parts)
if key in var:
log.debug(f"{repr(key)} already loaded; skip")
return var[key]
path = _make_path(base_path, *parts, default_suffix=default_suffix)
if path.suffix == ".yaml":
import yaml
with open(path, encoding="utf-8") as f:
var[key] = yaml.safe_load(f)
else:
raise ValueError(path.suffix)
return var[key]
def _make_path(
base_path: Path, *parts: str, default_suffix: Optional[str] = None
) -> Path:
p = base_path.joinpath(*parts)
return p.with_suffix(p.suffix or default_suffix) if default_suffix else p
[docs]
def load_package_data(*parts: str, suffix: Optional[str] = ".yaml") -> Any:
"""Load a :mod:`message_ix_models` package data file and return its contents.
Data is re-used if already loaded.
Example
-------
The single call:
>>> info = load_package_data("node", "R11")
1. loads the metadata file :file:`data/node/R11.yaml`, parsing its contents,
2. stores those values at ``PACKAGE_DATA["node R11"]`` for use by other code, and
3. returns the loaded values.
Parameters
----------
parts : iterable of str
Used to construct a path under :file:`message_ix_models/data/`.
suffix : str, optional
File name suffix, including, the ".", e.g. :file:`.yaml`.
Returns
-------
dict
Configuration values that were loaded.
"""
return _load(
PACKAGE_DATA,
MESSAGE_MODELS_PATH / "data",
*parts,
default_suffix=suffix,
)
[docs]
def load_private_data(*parts: str) -> Mapping: # pragma: no cover (needs message_data)
"""Load a private data file from :mod:`message_data` and return its contents.
Analogous to :func:`load_package_data`, but for non-public data.
Parameters
----------
parts : iterable of str
Used to construct a path under :file:`data/` in the :mod:`message_data`
repository.
Returns
-------
dict
Configuration values that were loaded.
Raises
------
RuntimeError
if :mod:`message_data` is not installed.
"""
if MESSAGE_DATA_PATH is None:
raise RuntimeError("message_data is not installed")
return _load(PRIVATE_DATA, MESSAGE_DATA_PATH / "data", *parts)
[docs]
def local_data_path(*parts, context: Optional["Context"] = None) -> Path:
"""Construct a path for local data.
The setting ``message local data`` in the user's :ref:`ixmp configuration file
<ixmp:configuration>` is used as a base path. If this is not configured, the
current working directory is used.
Parameters
----------
parts : sequence of str or Path
Joined to the base path using :meth:`.Path.joinpath`.
See also
--------
:ref:`Choose locations for data <local-data>`
"""
from .context import Context
ctx = context or Context.get_instance(-1)
return ctx.core.local_data.joinpath(*parts)
[docs]
def package_data_path(*parts) -> Path:
"""Construct a path to a file under :file:`message_ix_models/data/`.
Use this function to access data packaged and installed with
:mod:`message_ix_models`.
Parameters
----------
parts : sequence of str or Path
Joined to the base path using :meth:`~pathlib.PurePath.joinpath`.
See also
--------
:ref:`Choose locations for data <package-data>`
"""
return _make_path(MESSAGE_MODELS_PATH / "data", *parts)
[docs]
def private_data_path(*parts) -> Path:
"""Construct a path to a file under :file:`data/` in :mod:`message_data`.
Use this function to access non-public (for instance, embargoed or proprietary) data
stored in the :mod:`message_data` repository.
If the repository is not available, the function falls back to
:meth:`.Context.get_local_path`, where users may put files obtained through other
messages.
Parameters
----------
parts : sequence of str or Path
Joined to the base path using :meth:`~pathlib.PurePath.joinpath`.
See also
--------
:ref:`Choose locations for data <private-data>`
"""
if HAS_MESSAGE_DATA:
return _make_path(cast(Path, MESSAGE_DATA_PATH) / "data", *parts)
else:
from .context import Context
base = Context.get_instance(-1).get_local_path()
once(log, logging.WARNING, f"message_data not installed; fall back to {base}")
return base.joinpath(*parts)