import logging
from collections import ChainMap, defaultdict
from collections.abc import Collection, Iterable, Mapping, MutableMapping, Sequence
from datetime import datetime
from functools import partial, singledispatch
from itertools import count
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Protocol, Union
import message_ix
import pandas as pd
import pint
from ._convert_units import convert_units
from ._logging import mark_time, once, preserve_log_level, silence_log
from .cache import cached
from .common import (
HAS_MESSAGE_DATA,
MESSAGE_DATA_PATH,
MESSAGE_MODELS_PATH,
Adapter,
MappingAdapter,
WildcardAdapter,
load_package_data,
load_private_data,
local_data_path,
package_data_path,
private_data_path,
)
from .importlib import minimum_version
from .node import adapt_R11_R12, adapt_R11_R14, identify_nodes, nodes_ex_world
from .scenarioinfo import ScenarioInfo, Spec
from .sdmx import CodeLike, as_codes, eval_anno
if TYPE_CHECKING:
import genno
from message_ix_models.types import MutableParameterData, ParameterData
from .context import Context
__all__ = [
"HAS_MESSAGE_DATA",
"MESSAGE_DATA_PATH",
"MESSAGE_MODELS_PATH",
"Adapter",
"MappingAdapter",
"WildcardAdapter",
"adapt_R11_R12",
"adapt_R11_R14",
"add_par_data",
"aggregate_codes",
"as_codes",
"broadcast",
"cached",
"check_support",
"convert_units",
"copy_column",
"datetime_now_with_tz",
"eval_anno",
"ffill",
"identify_nodes",
"iter_keys",
"load_package_data",
"load_private_data",
"local_data_path",
"make_io",
"make_matched_dfs",
"make_source_tech",
"mark_time",
"maybe_query",
"merge_data",
"minimum_version",
"package_data_path",
"path_fallback",
"preserve_log_level",
"private_data_path",
"replace_par_data",
"same_node",
"same_time",
"show_versions",
"silence_log",
"strip_par_data",
]
log = logging.getLogger(__name__)
[docs]
def add_par_data(
scenario: message_ix.Scenario, data: "ParameterData", dry_run: bool = False
) -> int:
"""Add `data` to `scenario`.
Parameters
----------
data
Any mapping with keys that are valid :mod:`message_ix` parameter names, and
values that are pd.DataFrame or other arguments valid for
:meth:`message_ix.Scenario.add_par`.
dry_run : optional
Only show what would be done.
See also
--------
strip_par_data
"""
# TODO optionally add units automatically
# TODO allow units column entries to be pint.Unit objects
total = 0
for par_name, values in data.items():
N = values.shape[0]
log.info(f"{N} rows in {repr(par_name)}")
log.debug("\n" + values.to_string(max_rows=5))
total += N
if dry_run:
continue
# Work around iiasa/ixmp#425
values["unit"] = values["unit"].str.replace("^$", "-", regex=True)
try:
scenario.add_par(par_name, values)
except Exception: # pragma: no cover
print(values.head())
raise
return total
[docs]
def aggregate_codes(df: pd.DataFrame, dim: str, codes): # pragma: no cover
"""Aggregate `df` along dimension `dim` according to `codes`."""
raise NotImplementedError
# Construct an inverse mapping
mapping = {}
for code in codes:
mapping.update({child.id: code.id for child in code.child})
for key, group_series in df.groupby(dim):
print(key, group_series.replace({dim: mapping}))
[docs]
def broadcast(
df: pd.DataFrame, labels: Optional[pd.DataFrame] = None, **kwargs
) -> pd.DataFrame:
"""Fill missing data in `df` by broadcasting.
:func:`broadcast` is suitable for use with partly-filled data frames returned by
:func:`.message_ix.util.make_df`, with 1 column per dimension, plus a "value"
column. It is also usable with :meth:`pandas.DataFrame.pipe` for chained operations.
`labels` (if any) are handled first: one copy or duplicate of `df` is produced for
each row (set of labels) in this argument. Then, `kwargs` are handled;
:func:`broadcast` returns one copy for each element in the cartesian product of the
dimension labels given by `kwargs`.
Parameters
----------
labels : pandas.DataFrame
Each column (dimension) corresponds to one in `df`. Each row represents one
matched set of labels for those dimensions.
kwargs
Keys are dimensions. Values are labels along that dimension to fill.
Returns
-------
pandas.DataFrame
The length is either 1 or an integer multiple of the length of `df`.
Raises
------
ValueError
if any of the columns in `labels` or `kwargs` are not present in `df`, or if
those columns are present but not empty.
Examples
--------
>>> from message_ix import make_df
>>> from message_ix_models.util import broadcast
# Create a base data frame with some empty columns
>>> base = make_df("input", technology="t", value=[1.1, 2.2])
# Broadcast (duplicate) the data across 2 dimensions
>>> df = base.pipe(broadcast, node_loc=["node A", "node B"], mode=["m0", "m1"])
# Show part of the result
>>> df.dropna(axis=1)
mode node_loc technology value
0 m0 node A t 1.1
1 m0 node A t 2.2
2 m0 node B t 1.1
3 m0 node B t 2.2
4 m1 node A t 1.1
5 m1 node A t 2.2
6 m1 node B t 1.1
7 m1 node B t 2.2
"""
def _check_dim(d):
try:
if not df[d].isna().all():
raise ValueError(f"Dimension {d} was not empty\n\n{df.head()}")
except KeyError:
raise ValueError(f"Dimension {d} not among {list(df.columns)}")
# Broadcast using matched labels for 1+ dimensions from a data frame
if labels is not None:
# Check the dimensions
for dim in labels.columns:
_check_dim(dim)
# Concatenate 1 copy of `df` for each row in `labels`
df = pd.concat(
[df.assign(**row) for _, row in labels.iterrows()],
ignore_index=True,
sort=False,
)
# Next, broadcast other dimensions given as keyword arguments
for dim, levels in kwargs.items():
_check_dim(dim)
if len(levels) == 0:
log.debug(
f"Don't broadcast over {repr(dim)}; labels {levels} have length 0"
)
continue
# - Duplicate the data
# - Drop the existing column named 'dim'
# - Re-add the column from the constructed MultiIndex
# - Reindex for sequential row numbers
df = (
pd.concat([df] * len(levels), keys=levels, names=[dim], sort=False)
.drop(dim, axis=1)
.reset_index(dim)
.reset_index(drop=True)
)
return df
[docs]
def check_support(context, settings=dict(), desc: str = "") -> None:
"""Check whether a Context is compatible with certain `settings`.
Raises
------
:class:`NotImplementedError`
if any `context` value for a key of `settings` is not among the values in
`settings`.
:class:`KeyError`
if the key is not set on `context` at all.
See also
--------
:ref:`check-support`
"""
__tracebackhide__ = True
for key, values in settings.items():
if context[key] not in values:
raise NotImplementedError(
f"{desc} for {repr(values)}; got {repr(context[key])}"
)
[docs]
def copy_column(column_name):
"""For use with :meth:`pandas.DataFrame.assign`.
Examples
--------
Modify `df` by filling the column 'baz' with the value ``3``, and copying the column
'bar' into column 'foo'.
>>> df.assign(foo=copy_column('bar'), baz=3)
Note that a similar assignment can be achieved with :meth:`~pandas.DataFrame.eval`:
>>> df.eval("foo = bar")
:func:`copy_column` is useful in the context of more complicated calls to
:meth:`~pandas.DataFrame.assign`.
"""
return lambda df: df[column_name]
[docs]
def datetime_now_with_tz() -> datetime:
"""Current date and time with time zone information."""
tz = datetime.now().astimezone().tzinfo
return datetime.now(tz)
def either_dict_or_kwargs(name: str, dict_arg: Optional[dict], kwargs: dict) -> dict:
"""Return either `dict_arg` or `kwargs`; raise :class:`ValueError` if both."""
if dict_arg is None:
return kwargs
elif len(kwargs):
raise ValueError(f"Both {name}={{...}} and positional {name} args")
else:
return dict_arg.copy()
[docs]
def ffill(
df: pd.DataFrame, dim: str, values: Sequence[CodeLike], expr: Optional[str] = None
) -> pd.DataFrame:
"""Forward-fill `df` on `dim` to cover `values`.
Parameters
----------
df : pandas.DataFrame
Data to fill forwards.
dim : str
Dimension to fill along. Must be a column in `df`.
values : list of str
Labels along `dim` that must be present in the returned data frame.
expr : str, optional
If provided, :meth:`.DataFrame.eval` is called. This can be used to assign one
column to another. For instance, if `dim` == "year_vtg" and `expr` is "year_act
= year_vtg", then forward filling is performed along the "year_vtg" dimension/
column, and then the filled values are copied to the "year_act" column.
"""
if dim in ("value", "unit"):
raise ValueError(dim)
# Mapping from (values existing in `df`) -> equal or greater members of `values`
mapping = defaultdict(set)
last_seen = None
for v in sorted(set(values) | set(df[dim].unique())):
if v in df[dim].unique():
last_seen = v
mapping[last_seen].add(v)
def _maybe_eval(df):
return df.eval(expr) if expr is not None else df
dfs = [df]
for key, group_df in df.groupby(dim):
for new_label in sorted(mapping[key])[1:]:
# Duplicate the data; assign the new_label to `dim`
dfs.append(group_df.assign(**{dim: new_label}).pipe(_maybe_eval))
return pd.concat(dfs, ignore_index=True)
class KeyIterator(Protocol):
def __call__(self) -> "genno.Key": ...
[docs]
def iter_keys(base: "genno.Key") -> KeyIterator:
"""Return an iterator over a sequence of keys starting with `base_key`.
This can be used for shorthand when constructing sequences of :mod:`genno`
computations.
Example
-------
>>> base_key = genno.Key("foo:a-b-c")
>>> k = iter_keys(base_key)
>>> k()
<foo:a-b-c:0>
>>> k()
<foo:a-b-c:1>
>>> k()
<foo:a-b-c:2>
"""
return partial(next, map(lambda i: base + str(i), count()))
[docs]
def make_io(
src: tuple[str, str, str],
dest: tuple[str, str, str],
efficiency: float,
on: Literal["input", "output"] = "input",
**kwargs,
):
"""Return input and output data frames for a 1-to-1 technology.
Parameters
----------
src : tuple of str
Input (commodity, level, unit)
dest : tuple of str
Output (commodity, level, unit)
efficiency : float
Conversion efficiency.
on : 'input' or 'output'
If 'input', `efficiency` applies to the input, and the output, thus the activity
level of the technology, is in dest[2] units. If 'output', the opposite.
kwargs
Passed to :func:`.make_df`.
Returns
-------
dict (str -> pd.DataFrame)
Keys are 'input' and 'output'; values are data frames.
"""
return dict(
input=message_ix.make_df(
"input",
commodity=src[0],
level=src[1],
unit=src[2],
value=efficiency if on == "input" else 1.0,
**kwargs,
),
output=message_ix.make_df(
"output",
commodity=dest[0],
level=dest[1],
unit=dest[2],
value=1.0 if on == "input" else efficiency,
**kwargs,
),
)
[docs]
def make_matched_dfs(
base: Union[MutableMapping, pd.DataFrame],
**par_value: Union[float, pint.Quantity, dict],
) -> "MutableParameterData":
"""Return data frames derived from `base` for multiple parameters.
Creates one data frame per keyword argument.
Parameters
----------
base : pandas.DataFrame, dict, etc.
Used to populate other columns of each data frame. Duplicates—which occur when
the target parameter has fewer dimensions than `base`—are dropped.
par_values :
Argument names (e.g. ‘fix_cost’) are passed to :func:`.make_df`. If the value is
:class:`float`, it overwrites the "value" column; if :class:`pint.Quantity`, its
magnitude overwrites "value" and its units the "units" column, as a formatted
string.
Returns
-------
:class:`dict` of :class:`pandas.DataFrame`
one for each parameter in `par_values`.
Examples
--------
>>> input = make_df("input", ...)
>>> cf_tl = make_matched_dfs(
>>> input,
>>> capacity_factor=1,
>>> technical_lifetime=pint.Quantity(8, "year"),
>>> )
"""
replace: dict[str, Any] = dict()
data = ChainMap(replace, base)
result = dict()
for par, values in par_value.items():
replace.clear()
if isinstance(values, dict):
replace.update(values)
value = replace.pop("value")
else:
value = values
if isinstance(value, pint.Quantity):
replace["value"] = value.magnitude
replace["unit"] = f"{value.units:~}"
else:
replace["value"] = value
result[par] = (
message_ix.make_df(par, **data).drop_duplicates().reset_index(drop=True)
)
return result
[docs]
def make_source_tech(
info: Union[message_ix.Scenario, ScenarioInfo], common, **values
) -> "MutableParameterData":
"""Return parameter data for a ‘source’ technology.
The technology has no inputs; its output commodity and/or level are determined by
`common`; either single values, or :obj:`None` if the result will be
:meth:`~pandas.DataFrame.pipe`'d through :func:`broadcast`.
Parameters
----------
info : .Scenario or .ScenarioInfo
common : dict
Passed to :func:`.make_df`.
**values
Values for 'capacity_factor' (optional; default 1.0), 'output', 'var_cost', and
optionally 'technical_lifetime'.
Returns
-------
dict
Suitable for :func:`add_par_data`.
"""
# Check arguments
if isinstance(info, message_ix.Scenario):
info = ScenarioInfo(info)
values.setdefault("capacity_factor", 1.0)
missing = {"capacity_factor", "output", "var_cost"} - set(values.keys())
if len(missing):
raise ValueError(f"make_source_tech() needs values for {repr(missing)}")
elif "technical_lifetime" not in values:
log.debug("No technical_lifetime for source technology")
# Create data for "output"
result = dict(
output=message_ix.make_df(
"output",
value=values.pop("output"),
year_act=info.Y,
year_vtg=info.Y,
**common,
)
.pipe(broadcast, node_loc=nodes_ex_world(info.N))
.pipe(same_node)
)
# Add data for other parameters
result.update(make_matched_dfs(base=result["output"], **values))
return result
[docs]
def maybe_query(series: pd.Series, query: Optional[str]) -> pd.Series:
"""Apply :meth:`pandas.DataFrame.query` if the `query` arg is not :obj:`None`.
:meth:`~pandas.DataFrame.query` is not chainable (`pandas-dev/pandas#37941
<https://github.com/pandas-dev/pandas/issues/37941>`_). Use this function with
:meth:`pandas.Series.pipe`, passing an argument that may be :obj:`None`, to have a
chainable query operation that can be a no-op.
"""
# Convert Series to DataFrame, query(), then retrieve the single column
return series if query is None else series.to_frame().query(query)[0]
[docs]
def merge_data(base: "MutableParameterData", *others: "ParameterData") -> None:
"""Merge dictionaries of DataFrames together into `base`.
For use with :mod:`genno`, see instead :func:`.report.operator.merge_data` that
*returns* the merged data rather than updating the first argument.
"""
for other in others:
for par, df in other.items():
base[par] = pd.concat([base.get(par, None), df])
[docs]
def path_fallback(
*parts: Union[str, Path],
where: Union[str, list[Union[str, Path]]] = "",
context: Optional["Context"] = None,
) -> Path:
"""Locate a path constructed from `parts` found in the first of several directories.
This allows to implement ‘fallback’ behaviour in which files or directories in
certain locations are used preferentially.
Parameters
----------
parts :
Path parts or fragments such as directory names and a final file name.
where :
Either:
- :class:`str` containing one or more of the following, separated by white
space:
- "cache": locate `parts` in the :mod:`message_ix_models` cache directory.
See :attr:`.Config.cache_path`.
- "local": locate `parts` in the user's local data directory (same as
:func:`local_data_path`).
- "package": locate `parts` in :mod:`message_ix_models` package data (same
as :func:`.package_data_path`).
- "private": locate `parts` in the :mod:`message_data` :file:`/data/`
directory (same as :func:`.private_data_path`).
- "test": locate test data in :py:`package_data_path("test", ...)`
- :class:`list` where each element is :class:`str` (one of the above) or a
:class:`pathlib.Path`.
Returns
-------
pathlib.Path
The first of the locations indicated by `where` in which the file or directory
`parts` exists.
Raises
------
ValueError
If `where` is empty or `parts` are not found in any of the indicated locations.
"""
from .context import Context
context = context or Context.get_instance(-1)
dirs, test_dir = [], None
for item in where.split() if isinstance(where, str) else where:
if isinstance(item, str):
if item == "cache":
dirs.append(context.core.cache_path)
elif item == "local":
dirs.append(context.core.local_data)
elif item == "package":
dirs.append(package_data_path())
elif item == "private":
dirs.append(private_data_path())
elif item == "test":
test_dir = package_data_path("test")
dirs.append(test_dir)
else:
dirs.append(item)
for path in [d.joinpath(*parts) for d in dirs]:
if not path.exists():
once(log, logging.DEBUG, f"Not found: {path}")
continue
elif test_dir and path.is_relative_to(test_dir):
msg = f"Reading test (fuzzed, random, and/or partial) data from {path}"
once(log, logging.WARNING, msg)
return path
raise ValueError(
f"No directories identified among {where!r}"
if not dirs
else f"'{Path(*parts)!s}' not found in any of {dirs}"
)
[docs]
def replace_par_data(
scenario: message_ix.Scenario,
parameters: Union[str, Sequence[str]],
filters: Mapping[str, Union[str, int, Collection[str], Collection[int]]],
to_replace: Mapping[str, Union[Mapping[str, str], Mapping[int, int]]],
) -> None:
"""Replace data in `parameters` of `scenario`.
Parameters
----------
scenario
Scenario in which to replace data.
parameters : str or sequence of str
Name(s) of parameters in which to replace data.
filters
Passed to :meth:`.Scenario.par` argument of the same name.
to_replace
Passed to :meth:`pandas.DataFrame.replace` argument of the same name.
Examples
--------
Replace data in the "relation_activity" parameter for a particular technology and
relation: assign the same values as entries in a different relation name for the
same technology.
>>> replace_par_data(
... scenario,
... "relation_activity",
... dict(technology="hp_gas_i", relation="CO2_r_c"),
... dict(relation={"CO2_r_c": "CO2_ind"}),
... )
"""
from message_ix_models.model.build import apply_spec
pars = parameters.split() if isinstance(parameters, str) else parameters
# Create a Spec that requires `scenario` to have all the set elements mentioned by
# `filters` and/or `replacements`
s = Spec()
for k, v in filters.items():
s.require.set[k].extend([v] if isinstance(v, (str, int)) else v)
for k, v in to_replace.items():
s.require.set[k].extend(v.keys())
s.require.set[k].extend(v.values())
# Use apply_spec() simply to check that `scenario` contains the expected items,
# before attempting to modify data
apply_spec(scenario, s)
msg = f"Replace {filters!r} with {to_replace!r}"
log.info(msg)
for par_name in pars:
with scenario.transact(f"{msg} in {par_name!r}"):
# Base data, to be replaced
to_remove = scenario.par(par_name, filters=filters)
# Remove the base data
scenario.remove_par(par_name, to_remove.drop(columns=["value", "unit"]))
# Add the modified data
scenario.add_par(par_name, to_remove.replace(to_replace))
log.info(f"{len(to_remove)} obs in {par_name!r}")
[docs]
@singledispatch
def same_node(data: pd.DataFrame, from_col: str = "node_loc") -> pd.DataFrame:
"""Fill 'node_{,dest,loc,origin,rel,share}' in `df` from `from_col`."""
cols = list(
set(data.columns)
& ({"node", "node_loc", "node_origin", "node_dest", "node_rel", "node_share"})
- {from_col}
)
return data.assign(**{c: copy_column(from_col) for c in cols})
@same_node.register(dict)
def _(
data: "MutableParameterData", from_col: str = "node_loc"
) -> "MutableParameterData":
for key, df in data.items():
data[key] = same_node(df, from_col=from_col)
return data
[docs]
@singledispatch
def same_time(data: pd.DataFrame) -> pd.DataFrame:
"""Fill 'time_origin'/'time_dest' in `df` from 'time'."""
cols = list(set(data.columns) & {"time_origin", "time_dest"})
return data.assign(**{c: copy_column("time") for c in cols})
@same_time.register(dict)
def _(data: "MutableParameterData") -> "MutableParameterData":
for key, df in data.items():
data[key] = same_time(df)
return data
[docs]
def show_versions() -> str:
"""Output of :func:`ixmp.show_versions`, as a :class:`str`."""
from io import StringIO
import ixmp
from ._logging import preserve_log_handlers
# Retrieve package versions
buf = StringIO()
# show_versions() imports pyam-iamc, which in turn imports ixmp4, which removes all
# handlers from the root logger (?!). Preserve the message-ix-models logging config.
with preserve_log_handlers():
ixmp.show_versions(buf)
return buf.getvalue()
# FIXME Reduce complexity from 14 to ≤13
[docs]
def strip_par_data( # noqa: C901
scenario: message_ix.Scenario,
set_name: str,
element: str,
dry_run: bool = False,
dump: Optional["MutableParameterData"] = None,
) -> int:
"""Remove `element` from `set_name` in scenario, optionally dumping to `dump`.
Parameters
----------
dry_run : bool, optional
If :data:`True`, only show what would be done.
dump : dict, optional
If provided, stripped data are stored in this dictionary. Otherwise, they are
discarded.
Returns
-------
int
Total number of rows removed across all parameters.
See also
--------
add_par_data
"""
par_list = scenario.par_list()
no_data = set() # Names of parameters with no data being stripped
total = 0 # Total observations stripped
if dump is None:
pars: Iterable[str] = [] # Don't iterate over parameters unless dumping
else:
log.info(
f"Remove data with {set_name}={element!r}"
+ (" (DRY RUN)" if dry_run else "")
)
# Iterate over parameters with ≥1 dimensions indexed by `set_name`
pars = scenario.items(indexed_by=set_name, par_data=False)
for par_name in pars:
if par_name not in par_list: # pragma: no cover
log.warning(
f" MESSAGEix parameter {par_name!r} missing in Scenario {scenario.url}"
)
continue
# Iterate over dimensions indexed by `set_name`
for dim, _ in filter(
lambda item: item[1] == set_name,
zip(scenario.idx_names(par_name), scenario.idx_sets(par_name)),
):
# Check for contents of par_name that include `element`
par_data = scenario.par(par_name, filters={dim: element})
N = len(par_data)
total += N
if N == 0:
# No data; no need to do anything further
no_data.add(par_name)
continue
elif dump is not None:
dump[par_name] = pd.concat(
[dump.get(par_name, pd.DataFrame()), par_data]
)
log.info(f" {N} rows in {par_name!r}")
# Show some debug info
for col in filter(
lambda c: c != set_name and c in par_data.columns,
("commodity", "level", "technology"),
):
log.info(f" with {col}={sorted(par_data[col].unique())}")
if dry_run:
continue
# Actually remove the data
scenario.remove_par(par_name, key=par_data)
# NB would prefer to do the following, but raises an exception:
# scenario.remove_par(par_name, key={set_name: [value]})
if not dry_run and dump is not None:
log.info(f" {total} rows total")
if no_data:
log.debug(f"No data removed from {len(no_data)} other parameters")
if not dry_run:
log.info(f"Remove {element!r} from set {set_name!r}")
try:
scenario.remove_set(set_name, element)
except Exception as e:
if "does not have an element" in str(e):
log.info(" …not found")
else: # pragma: no cover
raise
return total