"""Simulated solution data for testing :mod:`~message_ix_models.report`."""
import logging
from collections import ChainMap, defaultdict
from collections.abc import Mapping, Sequence
from copy import deepcopy
from dataclasses import dataclass
from functools import cache, lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union
import genno
import pandas as pd
from dask.core import quote
from genno import Key, KeyExistsError
from message_ix import Reporter
from pandas.api.types import is_scalar
from message_ix_models import ScenarioInfo
from message_ix_models.util import minimum_version
from message_ix_models.util._logging import mark_time, silence_log
from message_ix_models.util.ixmp import rename_dims
if TYPE_CHECKING:
from genno.types import AnyQuantity
from message_ix.models import Item
from pandas import ExcelFile
__all__ = [
"add_simulated_solution",
"data_from_file",
"simulate_qty",
"to_simulate",
]
log = logging.getLogger(__name__)
@dataclass
class MockScenario:
"""Object to mock a :class:`.Scenario` with data from a :file:`.xlsx` file.
For use with :func:`.reporter_from_excel`.
"""
_info: "ScenarioInfo"
_file: "ExcelFile"
@cache
def cat(self, name: str, cat: str):
return (
pd.read_excel(self._file, sheet_name=f"cat_{name}")
.query(f"type_{name} == {cat!r}")[name]
.to_list()
)
@cache
def par(self, name):
return pd.read_excel(self._file, sheet_name=name)
def _par_as_qty(self, name, dims):
return genno.Quantity(
self.par(name).rename(columns=dims).set_index(list(dims.values()))["value"]
)
@cache
def set(self, name):
df = pd.read_excel(self._file, sheet_name=name)
return df.iloc[:, 0].to_list() if 1 == len(df.columns) else df
def has_solution(self):
return True
def vintage_and_active_years(self):
return None
@cache
def par_list(self):
return (
pd.read_excel(self._file, sheet_name="ix_type_mapping")
.query("ix_type == 'par'")["item"]
.to_list()
)
@cache
def set_list(self):
return (
pd.read_excel(self._file, sheet_name="ix_type_mapping")
.query("ix_type == 'set'")["item"]
.to_list()
)
def __getattr__(self, name):
return getattr(self._info, name)
def __hash__(self):
return hash(self._file)
def dims_of(info: "Item") -> dict[str, str]:
"""Return a mapping from the full index names to short dimension IDs of `info`."""
return {d: rename_dims().get(d, d) for d in (info.dims or info.coords or [])}
[docs]
@minimum_version("message_ix 3.7.0.post0")
@lru_cache(1)
def to_simulate():
"""Return items to be included in a simulated solution."""
from message_ix.models import MACRO, MESSAGE
# Items to included in a simulated solution: MESSAGE sets and parameters; some
# variables
result = deepcopy(MESSAGE.items)
# MACRO variables
result.update({k: MACRO.items[k] for k in ("GDP", "MERtoPPP")})
return result
def reporter_from_excel(path: "Path") -> "Reporter":
"""Return a :class:`.Reporter` that provides its data from an Excel file.
The file must be of the format generated by :meth:`.Scenario.to_excel`.
.. todo:: Move upstream to a new method :meth:`ixmp.Reporter.from_excel`.
"""
import pandas as pd
from message_ix_models.util.ixmp import rename_dims
rep = Reporter()
info = rep.graph["scenario info"] = ScenarioInfo(model="m", scenario="s")
ef = rep.graph["_file"] = pd.ExcelFile(path)
mock = rep.graph["scenario"] = MockScenario(info, ef)
# Add tasks to retrieve sets from file
for set_name in mock.set_list():
key = rename_dims().get(set_name, set_name)
rep.add(key, partial(mock.set, set_name))
# Add tasks to retrieve parameter data from file
for par_name in mock.par_list():
dims = dims_of(to_simulate()[par_name])
key = Key(par_name, list(dims.values()))
rep.add(key, partial(mock._par_as_qty, par_name, dims))
# Pre-populate some sets of `info`
for name in "commodity", "node", "year":
info.set[name] = rep.get(rename_dims()[name])
return rep
[docs]
def simulate_qty(
name: str, dims: list[str], item_data: Union[dict, pd.DataFrame]
) -> "AnyQuantity":
"""Return simulated data for item `name`.
Parameters
----------
dims :
Dimensions of the resulting quantity.
item_data :
Optional data for the quantity.
"""
if isinstance(item_data, dict):
# NB this is code lightly modified from make_df
# Default values for every column
data: Mapping = ChainMap(item_data, defaultdict(lambda: None))
# Arguments for pd.DataFrame constructor
args: dict[str, Any] = dict(data={})
# Flag if all values in `data` are scalars
all_scalar = True
for column in dims + ["value"]:
# Update flag
all_scalar &= is_scalar(data[column])
# Store data
args["data"][column] = data[column]
if all_scalar:
# All values are scalars, so the constructor requires an index to be passed
# explicitly.
args["index"] = [0]
df = pd.DataFrame(**args)
else:
# Provided complete data frame
df = item_data.rename(columns=rename_dims())
# Data must be entirely empty, or complete
assert not df.isna().any().any() or df.isna().all().all(), data
assert not df.duplicated().any(), f"Duplicate data for simulated {repr(name)}"
return genno.Quantity(df.set_index(dims)["value"] if len(dims) else df, name=name)
[docs]
def data_from_file(path: Path, *, name: str, dims: Sequence[str]) -> "AnyQuantity":
"""Read simulated solution data for item `name` from `path`.
For variables and equations (`name` in upper case), the file **must** have columns
corresponding to `dims` followed by "Val", "Marginal", "Upper", and "Scale". The
"Val" column is returned.
For parameters, the file **must** have columns corresponding to `dims` followed by
"value" and "unit". The "value" column is returned.
"""
if name.isupper():
# Construct a list of the columns
# NB Must assign the dimensions directly; they cannot be read from the file, as
# the column headers are the internal GAMS set names (e.g. "year_all")
# instead of the index names from message_ix.
cols = list(dims) + ["Val", "Marginal", "Lower", "Upper", "Scale"]
return genno.Quantity(
pd.read_csv(path, engine="pyarrow")
.set_axis(cols, axis=1)
.set_index(cols[:-5])["Val"],
name=name,
)
else:
cols = list(dims) + ["value", "unit"]
tmp = (
pd.read_csv(path, engine="pyarrow")
# Drop a leading index column that appears in some files
# TODO Adjust .snapshot.unpack() to avoid generating this column; update
# data; then remove this call
.drop(columns="", errors="ignore")
.set_axis(cols, axis=1)
.set_index(cols[:-2])
)
# TODO pass units if they are unique
return genno.Quantity(tmp["value"], name=name)
[docs]
@minimum_version("message_ix 3.6")
def add_simulated_solution(
rep: Reporter,
info: ScenarioInfo,
data: Optional[dict] = None,
path: Optional[Path] = None,
):
"""Add a simulated model solution to `rep`.
Parameters
----------
data : dict or pandas.DataFrame, optional
If given, a mapping from MESSAGE item (set, parameter, or variable) names to
inputs that are passed to :func:`simulate_qty`.
path : Path, optional
If given, a path to a directory containing one or more files with names like
:file:`ACT.csv.gz`. These files are taken as containing "simulated" model
solution data for the MESSAGE variable with the same name. See
:func:`data_from_file`.
"""
from ixmp.backend import ItemType
rep.configure(
rename_dims=dict(
node_rel="nr",
year_rel="yr",
),
)
mark_time()
N = len(rep.graph)
# Ensure "scenario" is present in the graph
rep.graph.setdefault("scenario", None)
# Add simulated data
data = data or dict()
for name, item_info in to_simulate().items():
dims = list(dims_of(item_info).values())
key = Key(name, dims)
# Add a task to load data from a file in `path`, if it exists
try:
assert path is not None
p = path.joinpath(name).with_suffix(".csv.gz")
assert p.exists()
except AssertionError:
pass # No `path` or no such file
else:
# Add data from file
rep.add(key, data_from_file, p, name=name, dims=key.dims, sums=True)
continue
if item_info.type == ItemType.SET and name not in rep:
# Add the set elements from `info`
rep.add(rename_dims().get(name, name), quote(info.set[name]))
elif item_info.type in (ItemType.PAR, ItemType.VAR):
# Retrieve an existing key for `name`
try:
full_key = rep.full_key(name)
except KeyError:
full_key = None # Not present in `rep`
# Simulate data for name
item_data = data.get(name)
if full_key and not item_data:
# Don't overwrite existing task with empty data
continue
# Add a task to simulate data for this quantity
# NB data.get() can return None, but simulate_qty() needs item_data to not
# be None
rep.add(
key,
simulate_qty,
name=name,
dims=dims,
item_data=item_data,
sums=True,
)
log.info(f"{len(rep.graph) - N} keys")
N = len(rep.graph)
mark_time()
# Prepare the base MESSAGEix computations
with silence_log("genno", logging.CRITICAL):
try:
rep.add_tasks()
except KeyExistsError:
pass # `rep` was produced with Reporter.from_scenario()
log.info(f"{len(rep.graph)} total keys")
mark_time()