import logging
from collections import defaultdict
from copy import copy
from typing import Dict, List, Mapping, Optional, Sequence, Union
import message_ix
import pandas as pd
from message_ix.models import MESSAGE_ITEMS
from sdmx.model import AnnotableArtefact, Annotation, Code
from ._convert_units import convert_units, series_of_pint_quantity
from .cache import cached
from .common import (
MESSAGE_DATA_PATH,
MESSAGE_MODELS_PATH,
load_package_data,
load_private_data,
package_data_path,
private_data_path,
)
from .node import adapt_R11_R14, identify_nodes
__all__ = [
"MESSAGE_DATA_PATH",
"MESSAGE_MODELS_PATH",
"adapt_R11_R14",
"cached",
"check_support",
"convert_units",
"identify_nodes",
"load_package_data",
"load_private_data",
"maybe_query",
"package_data_path",
"private_data_path",
"series_of_pint_quantity",
]
log = logging.getLogger(__name__)
def add_par_data(
scenario: message_ix.Scenario,
data: Mapping[str, pd.DataFrame],
dry_run: bool = False,
):
"""Add `data` to `scenario`.
Parameters
----------
data
Dict with keys that are parameter names, and values are pd.DataFrame or other
arguments
dry_run : optional
Only show what would be done.
See also
--------
strip_par_data
"""
total = 0
for par_name, values in data.items():
N = values.shape[0]
log.info(f"{N} rows in {repr(par_name)}")
log.debug(values.to_string(max_rows=5))
total += N
if dry_run:
continue
scenario.add_par(par_name, values)
return total
def as_codes(data: Union[List[str], Dict[str, Dict]]) -> List[Code]:
"""Convert *data* to a :class:`list` of :class:`.Code` objects.
Various inputs are accepted:
- :class:`list` of :class:`str`.
- :class:`dict`, in which keys are :attr:`.Code.id` and values are further
:class:`dict` with keys matching other :class:`.Code` attributes.
"""
# Assemble results as a dictionary
result: Dict[str, Code] = {}
if isinstance(data, list):
# FIXME typing ignored temporarily for PR#9
data = dict(zip(data, data)) # type: ignore [arg-type]
elif not isinstance(data, Mapping):
raise TypeError(data)
for id, info in data.items():
if isinstance(info, str):
info = dict(name=info)
elif isinstance(info, Mapping):
info = copy(info)
else:
raise TypeError(info)
code = Code(
id=str(id),
name=info.pop("name", str(id).title()),
)
# Store the description, if any
try:
code.description = info.pop("description")
except KeyError:
pass
# Associate with a parent
try:
parent_id = info.pop("parent")
except KeyError:
pass # No parent
else:
result[parent_id].append_child(code)
# Associate with any children
for id in info.pop("child", []):
try:
code.append_child(result[id])
except KeyError:
pass # Not parsed yet
# Convert other dictionary (key, value) pairs to annotations
for id, value in info.items():
code.annotations.append(
Annotation(id=id, text=value if isinstance(value, str) else repr(value))
)
result[code.id] = code
return list(result.values())
def aggregate_codes(df: pd.DataFrame, dim: str, codes): # pragma: no cover
"""Aggregate `df` along dimension `dim` according to `codes`."""
raise NotImplementedError
# Construct an inverse mapping
mapping = {}
for code in codes:
mapping.update({child.id: code.id for child in code.child})
for key, group_series in df.groupby(dim):
print(key, group_series.replace({dim: mapping}))
def broadcast(df, **kwargs):
"""Fill missing data in `df` by broadcasting.
Arguments
---------
kwargs
Keys are dimensions. Values are labels along that dimension to fill.
"""
for dim, levels in kwargs.items():
# Checks
assert df[dim].isna().all(), f"Dimension {dim} was not empty\n\n{df.head()}"
if len(levels) == 0:
log.debug(
f"Don't broadcast over {repr(dim)}; labels {levels} have length 0"
)
continue
# - Duplicate the data
# - Drop the existing column named 'dim'
# - Re-add the column from the constructed MultiIndex
# - Reindex for sequential row numbers
df = (
pd.concat([df] * len(levels), keys=levels, names=[dim])
.drop(dim, axis=1)
.reset_index(dim)
.reset_index(drop=True)
)
return df
[docs]def check_support(context, settings=dict(), desc: str = "") -> None:
"""Check whether a Context is compatible with certain `settings`.
Raises
------
:class:`NotImplementedError`
if any `context` value for a key of `settings` is not among the values in
`settings`.
:class:`KeyError`
if the key is not set on `context` at all.
See also
--------
:ref:`check-support`
"""
__tracebackhide__ = True
for key, values in settings.items():
if context[key] not in values:
raise NotImplementedError(
f"{desc} for {repr(values)}; got {repr(context[key])}"
)
def copy_column(column_name):
"""For use with :meth:`pandas.DataFrame.assign`.
Examples
--------
Modify `df` by filling the column 'baz' with the value ``3``, and copying the column
'bar' into column 'foo'.
>>> df.assign(foo=copy_column('bar'), baz=3)
"""
return lambda df: df[column_name]
def eval_anno(obj: AnnotableArtefact, id: str):
"""Retrieve the annotation `id` from `obj`, run :func:`eval` on its contents.
This can be used for unpacking Python values (e.g. :class:`dict`) stored as an
annotation on a :class:`~sdmx.model.Code`.
Returns :obj:`None` if no attribute exists with the given `id`.
"""
try:
value = str(obj.get_annotation(id=id).text)
except KeyError:
# No such attribute
return None
try:
return eval(value)
except Exception:
# Something that can't be eval()'d, e.g. a string
return value
def ffill(
df: pd.DataFrame, dim: str, values: Sequence[Union[str, Code]], expr: str = None
) -> pd.DataFrame:
"""Forward-fill `df` on `dim` to cover `values`.
Parameters
----------
df : .DataFrame
Data to fill forwards.
dim : str
Dimension to fill along. Must be a column in `df`.
values : list of str
Labels along `dim` that must be present in the returned data frame.
expr : str, optional
If provided, :meth:`.DataFrame.eval` is called. This can be used to assign one
column to another. For instance, if `dim` == "year_vtg" and `expr` is "year_act
= year_vtg", then forward filling is performed along the "year_vtg" dimension/
column, and then the filled values are copied to the "year_act" column.
"""
if dim in ("value", "unit"):
raise ValueError(dim)
# Mapping from (values existing in `df`) -> equal or greater members of `values`
mapping = defaultdict(set)
last_seen = None
for v in sorted(set(values) | set(df[dim].unique())):
if v in df[dim].unique():
last_seen = v
mapping[last_seen].add(v)
def _maybe_eval(df):
return df.eval(expr) if expr is not None else df
dfs = [df]
for key, group_df in df.groupby(dim):
for new_label in sorted(mapping[key])[1:]:
# Duplicate the data; assign the new_label to `dim`
dfs.append(group_df.assign(**{dim: new_label}).pipe(_maybe_eval))
return pd.concat(dfs, ignore_index=True)
def iter_parameters(set_name):
"""Iterate over MESSAGEix parameters with *set_name* as a dimension.
Parameters
----------
set_name : str
Name of a set.
Yields
------
str
Names of parameters that have `set_name` indexing ≥1 dimension.
"""
# TODO move upstream. See iiasa/ixmp#402 and iiasa/message_ix#444
for name, info in MESSAGE_ITEMS.items():
if info["ix_type"] == "par" and set_name in info["idx_sets"]:
yield name
def make_io(src, dest, efficiency, on="input", **kwargs):
"""Return input and output data frames for a 1-to-1 technology.
Parameters
----------
src : tuple (str, str, str)
Input commodity, level, unit.
dest : tuple (str, str, str)
Output commodity, level, unit.
efficiency : float
Conversion efficiency.
on : 'input' or 'output'
If 'input', `efficiency` applies to the input, and the output, thus the activity
level of the technology, is in dest[2] units. If 'output', the opposite.
kwargs
Passed to :func:`make_df`.
Returns
-------
dict (str -> pd.DataFrame)
Keys are 'input' and 'output'; values are data frames.
"""
return dict(
input=message_ix.make_df(
"input",
commodity=src[0],
level=src[1],
unit=src[2],
value=efficiency if on == "input" else 1.0,
**kwargs,
),
output=message_ix.make_df(
"output",
commodity=dest[0],
level=dest[1],
unit=dest[2],
value=1.0 if on == "input" else efficiency,
**kwargs,
),
)
def make_matched_dfs(base, **par_value):
"""Return data frames derived from *base* for multiple parameters.
*par_values* maps from parameter names (e.g. 'fix_cost') to values.
make_matched_dfs returns a :class:`dict` of :class:`pandas.DataFrame`, one for each
parameter in *par_value*. The contents of *base* are used to populate the columns
of each data frame, and the values of *par_value* overwrite the 'value' column.
Duplicates—which occur when the target parameter has fewer dimensions than
*base*—are dropped.
Examples
--------
>>> input = make_df('input', ...)
>>> cf_tl = make_matched_dfs(
>>> input,
>>> capacity_factor=1,
>>> technical_lifetime=1,
>>> )
"""
data = {col: v for col, v in base.iteritems() if col != "value"}
return {
par: message_ix.make_df(par, **data, value=value)
.drop_duplicates()
.reset_index(drop=True)
for par, value in par_value.items()
}
def make_source_tech(info, common, **values) -> Dict[str, pd.DataFrame]:
"""Return parameter data for a ‘source’ technology.
The technology has no inputs; its output commodity and/or level are determined by
`common`; either single values, or :obj:`None` if the result will be
:meth:`~DataFrame.pipe`'d through :func:`broadcast`.
Parameters
----------
info : ScenarioInfo
common : dict
Passed to :func:`make_df`.
**values
Values for 'capacity_factor' (optional; default 1.0), 'output', 'var_cost', and
optionally 'technical_lifetime'.
Returns
-------
dict
Suitable for :func:`add_par_data`.
"""
# Check arguments
values.setdefault("capacity_factor", 1.0)
missing = {"capacity_factor", "output", "var_cost"} - set(values.keys())
if len(missing):
raise ValueError(f"make_source_tech() needs values for {repr(missing)}")
elif "technical_lifetime" not in values:
log.debug("No technical_lifetime for source technology")
# Create data for "output"
result = dict(
output=message_ix.make_df(
"output",
value=values.pop("output"),
year_act=info.Y,
year_vtg=info.Y,
**common,
)
.pipe(broadcast, node_loc=info.N[1:])
.pipe(same_node)
)
# Add data for other parameters
result.update(make_matched_dfs(base=result["output"], **values))
return result
[docs]def maybe_query(series: pd.Series, query: Optional[str]) -> pd.Series:
"""Apply :meth:`pandas.Series.query` if the `query` arg is not :obj:`None`.
:meth:`~pandas.Series.query` is not chainable (`pandas-dev/pandas#37941
<https://github.com/pandas-dev/pandas/issues/37941>`_). Use this function with
:func:`pandas.Series.pipe`, passing an argument that may be :obj:`None`, to have a
chainable query operation that can be a no-op.
"""
# Convert Series to DataFrame, query(), then retrieve the single column
return series if query is None else series.to_frame().query(query)[0]
def merge_data(base, *others):
"""Merge dictionaries of DataFrames together into `base`."""
for other in others:
for par, df in other.items():
base[par] = base[par].append(df) if par in base else df
def same_node(df):
"""Fill 'node_origin'/'node_dest' in `df` from 'node_loc'."""
cols = list(set(df.columns) & {"node_origin", "node_dest"})
return df.assign(**{c: copy_column("node_loc") for c in cols})
def strip_par_data(
scenario, set_name, value, dry_run=False, dump: Dict[str, pd.DataFrame] = None
):
"""Remove data from parameters of *scenario* where *value* in *set_name*.
Returns
-------
Total number of rows removed across all parameters.
See also
--------
add_par_data
"""
par_list = scenario.par_list()
no_data = []
total = 0
# Iterate over parameters with ≥1 dimensions indexed by `set_name`
for par_name in iter_parameters(set_name):
if par_name not in par_list: # pragma: no cover
log.warning(
f"MESSAGEix parameter {repr(par_name)} missing in Scenario "
f"{scenario.model}/{scenario.scenario}"
)
continue
# Iterate over dimensions indexed by `set_name`
for dim, _ in filter(
lambda item: item[1] == set_name,
zip(scenario.idx_names(par_name), scenario.idx_sets(par_name)),
):
# Check for contents of par_name that include *value*
par_data = scenario.par(par_name, filters={dim: value})
N = len(par_data)
if N == 0:
# No data; no need to do anything further
no_data.append(par_name)
continue
elif dump is not None:
dump[par_name] = pd.concat(
[dump.get(par_name, pd.DataFrame()), par_data]
)
log.info(f"Remove {N} rows in {repr(par_name)}")
# Show some debug info
for col in "commodity level technology".split():
if col == set_name or col not in par_data.columns:
continue
log.info(f" with {col}={sorted(par_data[col].unique())}")
if not dry_run:
# Actually remove the data
scenario.remove_par(par_name, key=par_data)
# NB would prefer to do the following, but raises an exception:
# scenario.remove_par(par_name, key={set_name: [value]})
total += N
level = logging.INFO if total > 0 else logging.DEBUG
log.log(level, f"{total} rows removed.")
log.debug(f"No data removed from {len(no_data)} other parameters")
return total