import logging
from collections import ChainMap, defaultdict
from collections.abc import Callable, Collection, Mapping, MutableMapping, Sequence
from datetime import datetime
from functools import partial, update_wrapper
from importlib.metadata import version
from itertools import count
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Protocol, Union
import message_ix
import pandas as pd
import pint
from platformdirs import user_cache_path
from ._convert_units import convert_units, series_of_pint_quantity
from ._logging import mark_time, preserve_log_level, silence_log
from .cache import cached
from .common import (
HAS_MESSAGE_DATA,
MESSAGE_DATA_PATH,
MESSAGE_MODELS_PATH,
Adapter,
MappingAdapter,
load_package_data,
load_private_data,
local_data_path,
package_data_path,
private_data_path,
)
from .node import adapt_R11_R12, adapt_R11_R14, identify_nodes, nodes_ex_world
from .scenarioinfo import ScenarioInfo, Spec
from .sdmx import CodeLike, as_codes, eval_anno
if TYPE_CHECKING:
import genno
__all__ = [
"HAS_MESSAGE_DATA",
"MESSAGE_DATA_PATH",
"MESSAGE_MODELS_PATH",
"Adapter",
"MappingAdapter",
"adapt_R11_R12",
"adapt_R11_R14",
"add_par_data",
"aggregate_codes",
"as_codes",
"broadcast",
"cached",
"check_support",
"convert_units",
"copy_column",
"datetime_now_with_tz",
"eval_anno",
"ffill",
"identify_nodes",
"iter_keys",
"load_package_data",
"load_private_data",
"local_data_path",
"make_io",
"make_matched_dfs",
"make_source_tech",
"mark_time",
"maybe_query",
"merge_data",
"minimum_version",
"package_data_path",
"path_fallback",
"preserve_log_level",
"private_data_path",
"replace_par_data",
"same_node",
"same_time",
"series_of_pint_quantity",
"show_versions",
"silence_log",
"strip_par_data",
]
log = logging.getLogger(__name__)
[docs]def add_par_data(
scenario: message_ix.Scenario,
data: Mapping[str, pd.DataFrame],
dry_run: bool = False,
):
"""Add `data` to `scenario`.
Parameters
----------
data
Dict with keys that are parameter names, and values are pd.DataFrame or other
arguments
dry_run : optional
Only show what would be done.
See also
--------
strip_par_data
"""
# TODO optionally add units automatically
# TODO allow units column entries to be pint.Unit objects
total = 0
for par_name, values in data.items():
N = values.shape[0]
log.info(f"{N} rows in {repr(par_name)}")
log.debug("\n" + values.to_string(max_rows=5))
total += N
if dry_run:
continue
# Work around iiasa/ixmp#425
values["unit"] = values["unit"].str.replace("^$", "-", regex=True)
try:
scenario.add_par(par_name, values)
except Exception: # pragma: no cover
print(values.head())
raise
return total
[docs]def aggregate_codes(df: pd.DataFrame, dim: str, codes): # pragma: no cover
"""Aggregate `df` along dimension `dim` according to `codes`."""
raise NotImplementedError
# Construct an inverse mapping
mapping = {}
for code in codes:
mapping.update({child.id: code.id for child in code.child})
for key, group_series in df.groupby(dim):
print(key, group_series.replace({dim: mapping}))
[docs]def broadcast(
df: pd.DataFrame, labels: Optional[pd.DataFrame] = None, **kwargs
) -> pd.DataFrame:
"""Fill missing data in `df` by broadcasting.
:func:`broadcast` is suitable for use with partly-filled data frames returned by
:func:`.message_ix.util.make_df`, with 1 column per dimension, plus a "value"
column. It is also usable with :meth:`pandas.DataFrame.pipe` for chained operations.
`labels` (if any) are handled first: one copy or duplicate of `df` is produced for
each row (set of labels) in this argument. Then, `kwargs` are handled;
:func:`broadcast` returns one copy for each element in the cartesian product of the
dimension labels given by `kwargs`.
Parameters
----------
labels : pandas.DataFrame
Each column (dimension) corresponds to one in `df`. Each row represents one
matched set of labels for those dimensions.
kwargs
Keys are dimensions. Values are labels along that dimension to fill.
Returns
-------
pandas.DataFrame
The length is either 1 or an integer multiple of the length of `df`.
Raises
------
ValueError
if any of the columns in `labels` or `kwargs` are not present in `df`, or if
those columns are present but not empty.
Examples
--------
>>> from message_ix import make_df
>>> from message_ix_models.util import broadcast
# Create a base data frame with some empty columns
>>> base = make_df("input", technology="t", value=[1.1, 2.2])
# Broadcast (duplicate) the data across 2 dimensions
>>> df = base.pipe(broadcast, node_loc=["node A", "node B"], mode=["m0", "m1"])
# Show part of the result
>>> df.dropna(axis=1)
mode node_loc technology value
0 m0 node A t 1.1
1 m0 node A t 2.2
2 m0 node B t 1.1
3 m0 node B t 2.2
4 m1 node A t 1.1
5 m1 node A t 2.2
6 m1 node B t 1.1
7 m1 node B t 2.2
"""
def _check_dim(d):
try:
if not df[d].isna().all():
raise ValueError(f"Dimension {d} was not empty\n\n{df.head()}")
except KeyError:
raise ValueError(f"Dimension {d} not among {list(df.columns)}")
# Broadcast using matched labels for 1+ dimensions from a data frame
if labels is not None:
# Check the dimensions
for dim in labels.columns:
_check_dim(dim)
# Concatenate 1 copy of `df` for each row in `labels`
df = pd.concat(
[df.assign(**row) for _, row in labels.iterrows()],
ignore_index=True,
sort=False,
)
# Next, broadcast other dimensions given as keyword arguments
for dim, levels in kwargs.items():
_check_dim(dim)
if len(levels) == 0:
log.debug(
f"Don't broadcast over {repr(dim)}; labels {levels} have length 0"
)
continue
# - Duplicate the data
# - Drop the existing column named 'dim'
# - Re-add the column from the constructed MultiIndex
# - Reindex for sequential row numbers
df = (
pd.concat([df] * len(levels), keys=levels, names=[dim], sort=False)
.drop(dim, axis=1)
.reset_index(dim)
.reset_index(drop=True)
)
return df
[docs]def check_support(context, settings=dict(), desc: str = "") -> None:
"""Check whether a Context is compatible with certain `settings`.
Raises
------
:class:`NotImplementedError`
if any `context` value for a key of `settings` is not among the values in
`settings`.
:class:`KeyError`
if the key is not set on `context` at all.
See also
--------
:ref:`check-support`
"""
__tracebackhide__ = True
for key, values in settings.items():
if context[key] not in values:
raise NotImplementedError(
f"{desc} for {repr(values)}; got {repr(context[key])}"
)
[docs]def copy_column(column_name):
"""For use with :meth:`pandas.DataFrame.assign`.
Examples
--------
Modify `df` by filling the column 'baz' with the value ``3``, and copying the column
'bar' into column 'foo'.
>>> df.assign(foo=copy_column('bar'), baz=3)
Note that a similar assignment can be achieved with :meth:`~pandas.DataFrame.eval`:
>>> df.eval("foo = bar")
:func:`copy_column` is useful in the context of more complicated calls to
:meth:`~pandas.DataFrame.assign`.
"""
return lambda df: df[column_name]
[docs]def datetime_now_with_tz() -> datetime:
"""Current date and time with time zone information."""
tz = datetime.now().astimezone().tzinfo
return datetime.now(tz)
[docs]def ffill(
df: pd.DataFrame, dim: str, values: Sequence[CodeLike], expr: Optional[str] = None
) -> pd.DataFrame:
"""Forward-fill `df` on `dim` to cover `values`.
Parameters
----------
df : pandas.DataFrame
Data to fill forwards.
dim : str
Dimension to fill along. Must be a column in `df`.
values : list of str
Labels along `dim` that must be present in the returned data frame.
expr : str, optional
If provided, :meth:`.DataFrame.eval` is called. This can be used to assign one
column to another. For instance, if `dim` == "year_vtg" and `expr` is "year_act
= year_vtg", then forward filling is performed along the "year_vtg" dimension/
column, and then the filled values are copied to the "year_act" column.
"""
if dim in ("value", "unit"):
raise ValueError(dim)
# Mapping from (values existing in `df`) -> equal or greater members of `values`
mapping = defaultdict(set)
last_seen = None
for v in sorted(set(values) | set(df[dim].unique())):
if v in df[dim].unique():
last_seen = v
mapping[last_seen].add(v)
def _maybe_eval(df):
return df.eval(expr) if expr is not None else df
dfs = [df]
for key, group_df in df.groupby(dim):
for new_label in sorted(mapping[key])[1:]:
# Duplicate the data; assign the new_label to `dim`
dfs.append(group_df.assign(**{dim: new_label}).pipe(_maybe_eval))
return pd.concat(dfs, ignore_index=True)
class KeyIterator(Protocol):
def __call__(self) -> "genno.Key": ...
[docs]def iter_keys(base: "genno.Key") -> KeyIterator:
"""Return an iterator over a sequence of keys starting with `base_key`.
This can be used for shorthand when constructing sequences of :mod:`genno`
computations.
Example
-------
>>> base_key = genno.Key("foo:a-b-c")
>>> k = iter_keys(base_key)
>>> k()
<foo:a-b-c:0>
>>> k()
<foo:a-b-c:1>
>>> k()
<foo:a-b-c:2>
"""
return partial(next, map(lambda i: base + str(i), count()))
def iter_parameters(set_name, scenario: Optional["message_ix.Scenario"] = None):
"""Iterate over MESSAGEix parameters with *set_name* as a dimension.
.. deprecated:: 2023.11
Use :meth:`ixmp.Scenario.par_list` with the :py:`indexed_by=...` argument
instead.
"""
try:
assert scenario is not None
yield from scenario.items(indexed_by=set_name, par_data=False)
except (TypeError, AssertionError): # ixmp < 3.8.0
import message_ix.models
for name, info in message_ix.models.MESSAGE_ITEMS.items():
if info["ix_type"] == "par" and set_name in info["idx_sets"]:
yield name
[docs]def make_io(src, dest, efficiency, on="input", **kwargs):
"""Return input and output data frames for a 1-to-1 technology.
Parameters
----------
src : tuple of str
Input (commodity, level, unit)
dest : tuple of str
Output (commodity, level, unit)
efficiency : float
Conversion efficiency.
on : 'input' or 'output'
If 'input', `efficiency` applies to the input, and the output, thus the activity
level of the technology, is in dest[2] units. If 'output', the opposite.
kwargs
Passed to :func:`.make_df`.
Returns
-------
dict (str -> pd.DataFrame)
Keys are 'input' and 'output'; values are data frames.
"""
return dict(
input=message_ix.make_df(
"input",
commodity=src[0],
level=src[1],
unit=src[2],
value=efficiency if on == "input" else 1.0,
**kwargs,
),
output=message_ix.make_df(
"output",
commodity=dest[0],
level=dest[1],
unit=dest[2],
value=1.0 if on == "input" else efficiency,
**kwargs,
),
)
[docs]def make_matched_dfs(
base: Union[MutableMapping, pd.DataFrame], **par_value: Union[float, pint.Quantity]
) -> dict[str, pd.DataFrame]:
"""Return data frames derived from `base` for multiple parameters.
Creates one data frame per keyword argument.
Parameters
----------
base : pandas.DataFrame, dict, etc.
Used to populate other columns of each data frame. Duplicates—which occur when
the target parameter has fewer dimensions than `base`—are dropped.
par_values :
Argument names (e.g. ‘fix_cost’) are passed to :func:`.make_df`. If the value is
:class:`float`, it overwrites the "value" column; if :class:`pint.Quantity`, its
magnitude overwrites "value" and its units the "units" column, as a formatted
string.
Returns
-------
:class:`dict` of :class:`pandas.DataFrame`
one for each parameter in `par_values`.
Examples
--------
>>> input = make_df("input", ...)
>>> cf_tl = make_matched_dfs(
>>> input,
>>> capacity_factor=1,
>>> technical_lifetime=pint.Quantity(8, "year"),
>>> )
"""
data = ChainMap(dict(), base)
result = {}
for par, value in par_value.items():
data.maps[0] = (
dict(value=value.magnitude, unit=f"{value.units:~}")
if isinstance(value, pint.Quantity)
else dict(value=value)
)
result[par] = (
message_ix.make_df(par, **data).drop_duplicates().reset_index(drop=True)
)
return result
[docs]def make_source_tech(
info: Union[message_ix.Scenario, ScenarioInfo], common, **values
) -> dict[str, pd.DataFrame]:
"""Return parameter data for a ‘source’ technology.
The technology has no inputs; its output commodity and/or level are determined by
`common`; either single values, or :obj:`None` if the result will be
:meth:`~pandas.DataFrame.pipe`'d through :func:`broadcast`.
Parameters
----------
info : .Scenario or .ScenarioInfo
common : dict
Passed to :func:`.make_df`.
**values
Values for 'capacity_factor' (optional; default 1.0), 'output', 'var_cost', and
optionally 'technical_lifetime'.
Returns
-------
dict
Suitable for :func:`add_par_data`.
"""
# Check arguments
if isinstance(info, message_ix.Scenario):
info = ScenarioInfo(info)
values.setdefault("capacity_factor", 1.0)
missing = {"capacity_factor", "output", "var_cost"} - set(values.keys())
if len(missing):
raise ValueError(f"make_source_tech() needs values for {repr(missing)}")
elif "technical_lifetime" not in values:
log.debug("No technical_lifetime for source technology")
# Create data for "output"
result = dict(
output=message_ix.make_df(
"output",
value=values.pop("output"),
year_act=info.Y,
year_vtg=info.Y,
**common,
)
.pipe(broadcast, node_loc=nodes_ex_world(info.N))
.pipe(same_node)
)
# Add data for other parameters
result.update(make_matched_dfs(base=result["output"], **values))
return result
[docs]def maybe_query(series: pd.Series, query: Optional[str]) -> pd.Series:
"""Apply :meth:`pandas.DataFrame.query` if the `query` arg is not :obj:`None`.
:meth:`~pandas.DataFrame.query` is not chainable (`pandas-dev/pandas#37941
<https://github.com/pandas-dev/pandas/issues/37941>`_). Use this function with
:meth:`pandas.Series.pipe`, passing an argument that may be :obj:`None`, to have a
chainable query operation that can be a no-op.
"""
# Convert Series to DataFrame, query(), then retrieve the single column
return series if query is None else series.to_frame().query(query)[0]
[docs]def merge_data(
base: MutableMapping[str, pd.DataFrame], *others: Mapping[str, pd.DataFrame]
) -> None:
"""Merge dictionaries of DataFrames together into `base`."""
for other in others:
for par, df in other.items():
base[par] = pd.concat([base.get(par, None), df])
[docs]def minimum_version(expr: str) -> Callable:
"""Decorator for functions that require a minimum version of some upstream package.
If the decorated function is called and the condition in `expr` is not met,
:class:`.NotImplementedError` is raised with an informative message.
The decorated function gains an attribute :py:`.minimum_version`, another decorator
that can be used on associated test code. This marks the test as XFAIL, raising
:class:`.NotImplementedError` or :class:`.RuntimeError` (e.g. for :mod:`.click`
testing).
See :func:`.prepare_reporter` / :func:`.test_prepare_reporter` for a usage example.
Parameters
----------
expr :
Like "example 1.2.3.post0". The condition for the decorated function is that
the installed version must be equal to or greater than this version.
"""
from packaging.version import parse
package, v_min = expr.split(" ")
v_package = version(package)
condition = parse(v_package) < parse(v_min)
message = f" with {package} {v_package} < {v_min}"
# Create the decorator
def decorator(func):
name = f"{func.__module__}.{func.__name__}()"
# Wrap `func`
def wrapper(*args, **kwargs):
if condition:
raise NotImplementedError(f"{name}{message}.")
return func(*args, **kwargs)
update_wrapper(wrapper, func)
# Create a test function decorator
def marker(test_func):
# Import pytest only when there is a test function to mark
import pytest
# Create the mark
mark = pytest.mark.xfail(
condition=condition,
raises=(NotImplementedError, RuntimeError),
reason=f"Not supported{message}",
)
# Attach to the test function
try:
test_func.pytestmark.append(mark)
except AttributeError:
test_func.pytestmark = [mark]
return test_func
# Store the decorator on the wrapped function
setattr(wrapper, "minimum_version", marker)
return wrapper
return decorator
[docs]def path_fallback(
*parts: Union[str, Path],
where: Union[str, list[Union[str, Path]]] = "",
) -> Path:
"""Locate a path constructed from `parts` found in the first of several directories.
This allows to implement ‘fallback’ behaviour in which files or directories in
certain locations are used preferentially.
Parameters
----------
parts :
Path parts or fragments such as directory names and a final file name.
where :
Either:
- :class:`str` containing one or more of:
- "cache": locate `parts` in the :mod:`message_ix_models` cache directory.
- "local": locate `parts in the user's local data directory (same as
:func:`local_data_path`).
- "package": locate `parts` in :mod:`message_ix_models` package data (same
as :func:`.package_data_path`).
- "private": locate `parts` in the :mod:`message_data` :file:`/data/`
directory (same as :func:`.private_data_path`).
- "test": locate test data in :py:`package_data_path("test", ...)`
- :class:`list` where each element is :class:`str` (one of the above) or a
:class:`pathlib.Path`.
Returns
-------
pathlib.Path
The first of the locations indicated by `where` in which the file or directory
`parts` exists.
Raises
------
ValueError
If `where` is empty or `parts` are not found in any of the indicated locations.
"""
dirs = []
for item in where.split() if isinstance(where, str) else where:
if isinstance(item, str):
if item == "cache":
dirs.append(user_cache_path("message-ix-models"))
elif item == "local":
dirs.append(local_data_path())
elif item == "package":
dirs.append(package_data_path())
elif item == "private":
dirs.append(private_data_path())
elif item == "test":
dirs.append(package_data_path("test"))
else:
dirs.append(item)
for path in [d.joinpath(*parts) for d in dirs]:
if not path.exists():
log.debug(f"Not found: {path}")
continue
return path
if not dirs:
raise ValueError(f"No directories identified among {where!r}")
else:
raise ValueError(f"'{Path(*parts)!s}' not found in any of {dirs}")
[docs]def replace_par_data(
scenario: message_ix.Scenario,
parameters: Union[str, Sequence[str]],
filters: Mapping[str, Union[str, int, Collection[str], Collection[int]]],
to_replace: Mapping[str, Union[Mapping[str, str], Mapping[int, int]]],
) -> None:
"""Replace data in `parameters` of `scenario`.
Parameters
----------
scenario
Scenario in which to replace data.
parameters : str or sequence of str
Name(s) of parameters in which to replace data.
filters
Passed to :meth:`.Scenario.par` argument of the same name.
to_replace
Passed to :meth:`pandas.DataFrame.replace` argument of the same name.
Examples
--------
Replace data in the "relation_activity" parameter for a particular technology and
relation: assign the same values as entries in a different relation name for the
same technology.
>>> replace_par_data(
... scenario,
... "relation_activity",
... dict(technology="hp_gas_i", relation="CO2_r_c"),
... dict(relation={"CO2_r_c": "CO2_ind"}),
... )
"""
from message_ix_models.model.build import apply_spec
pars = parameters.split() if isinstance(parameters, str) else parameters
# Create a Spec that requires `scenario` to have all the set elements mentioned by
# `filters` and/or `replacements`
s = Spec()
for k, v in filters.items():
s.require.set[k].extend([v] if isinstance(v, (str, int)) else v)
for k, v in to_replace.items():
s.require.set[k].extend(v.keys())
s.require.set[k].extend(v.values())
# Use apply_spec() simply to check that `scenario` contains the expected items,
# before attempting to modify data
apply_spec(scenario, s)
msg = f"Replace {filters!r} with {to_replace!r}"
log.info(msg)
for par_name in pars:
with scenario.transact(f"{msg} in {par_name!r}"):
# Base data, to be replaced
to_remove = scenario.par(par_name, filters=filters)
# Remove the base data
scenario.remove_par(par_name, to_remove.drop(columns=["value", "unit"]))
# Add the modified data
scenario.add_par(par_name, to_remove.replace(to_replace))
log.info(f"{len(to_remove)} obs in {par_name!r}")
[docs]def same_node(df: pd.DataFrame, from_col="node_loc") -> pd.DataFrame:
"""Fill 'node_{,dest,loc,origin,rel,share}' in `df` from `from_col`."""
cols = list(
set(df.columns)
& ({"node", "node_loc", "node_origin", "node_dest", "node_rel", "node_share"})
- {from_col}
)
return df.assign(**{c: copy_column(from_col) for c in cols})
[docs]def same_time(df: pd.DataFrame) -> pd.DataFrame:
"""Fill 'time_origin'/'time_dest' in `df` from 'time'."""
cols = list(set(df.columns) & {"time_origin", "time_dest"})
return df.assign(**{c: copy_column("time") for c in cols})
[docs]def show_versions() -> str:
"""Output of :func:`ixmp.show_versions`, as a :class:`str`."""
from io import StringIO
from . import ixmp
from ._logging import preserve_log_handlers
# Retrieve package versions
buf = StringIO()
# show_versions() imports pyam-iamc, which in turn imports ixmp4, which removes all
# handlers from the root logger (?!). Preserve the message-ix-models logging config.
with preserve_log_handlers():
ixmp.show_versions(buf)
return buf.getvalue()
# FIXME Reduce complexity from 14 to ≤13
[docs]def strip_par_data( # noqa: C901
scenario: message_ix.Scenario,
set_name: str,
element: str,
dry_run: bool = False,
dump: Optional[dict[str, pd.DataFrame]] = None,
) -> int:
"""Remove `element` from `set_name` in scenario, optionally dumping to `dump`.
Parameters
----------
dry_run : bool, optional
If :data:`True`, only show what would be done.
dump : dict, optional
If provided, stripped data are stored in this dictionary. Otherwise, they are
discarded.
Returns
-------
int
Total number of rows removed across all parameters.
See also
--------
add_par_data
"""
par_list = scenario.par_list()
no_data = set() # Names of parameters with no data being stripped
total = 0 # Total observations stripped
if dump is None:
pars = [] # Don't iterate over parameters unless dumping
else:
log.info(
f"Remove data with {set_name}={element!r}"
+ (" (DRY RUN)" if dry_run else "")
)
# Iterate over parameters with ≥1 dimensions indexed by `set_name`
pars = iter_parameters(set_name, scenario=scenario)
for par_name in pars:
if par_name not in par_list: # pragma: no cover
log.warning(
f" MESSAGEix parameter {par_name!r} missing in Scenario {scenario.url}"
)
continue
# Iterate over dimensions indexed by `set_name`
for dim, _ in filter(
lambda item: item[1] == set_name,
zip(scenario.idx_names(par_name), scenario.idx_sets(par_name)),
):
# Check for contents of par_name that include `element`
par_data = scenario.par(par_name, filters={dim: element})
N = len(par_data)
total += N
if N == 0:
# No data; no need to do anything further
no_data.add(par_name)
continue
elif dump is not None:
dump[par_name] = pd.concat(
[dump.get(par_name, pd.DataFrame()), par_data]
)
log.info(f" {N} rows in {par_name!r}")
# Show some debug info
for col in filter(
lambda c: c != set_name and c in par_data.columns,
("commodity", "level", "technology"),
):
log.info(f" with {col}={sorted(par_data[col].unique())}")
if dry_run:
continue
# Actually remove the data
scenario.remove_par(par_name, key=par_data)
# NB would prefer to do the following, but raises an exception:
# scenario.remove_par(par_name, key={set_name: [value]})
if not dry_run and dump is not None:
log.info(f" {total} rows total")
if no_data:
log.debug(f"No data removed from {len(no_data)} other parameters")
if not dry_run:
log.info(f"Remove {element!r} from set {set_name!r}")
try:
scenario.remove_set(set_name, element)
except Exception as e:
if "does not have an element" in str(e):
log.info(" …not found")
else: # pragma: no cover
raise
return total