Source code for message_ix_models.util

import logging
from collections import ChainMap, defaultdict
from collections.abc import (
    Callable,
    Collection,
    Iterable,
    Mapping,
    MutableMapping,
    Sequence,
)
from datetime import datetime
from functools import partial, singledispatch, update_wrapper
from importlib.metadata import version
from itertools import count
from pathlib import Path
from typing import TYPE_CHECKING, Literal, Optional, Protocol, Union

import message_ix
import pandas as pd
import pint
from platformdirs import user_cache_path

from ._convert_units import convert_units, series_of_pint_quantity
from ._logging import mark_time, preserve_log_level, silence_log
from .cache import cached
from .common import (
    HAS_MESSAGE_DATA,
    MESSAGE_DATA_PATH,
    MESSAGE_MODELS_PATH,
    Adapter,
    MappingAdapter,
    load_package_data,
    load_private_data,
    local_data_path,
    package_data_path,
    private_data_path,
)
from .node import adapt_R11_R12, adapt_R11_R14, identify_nodes, nodes_ex_world
from .scenarioinfo import ScenarioInfo, Spec
from .sdmx import CodeLike, as_codes, eval_anno

if TYPE_CHECKING:
    import genno

    from message_ix_models.types import MutableParameterData, ParameterData

__all__ = [
    "HAS_MESSAGE_DATA",
    "MESSAGE_DATA_PATH",
    "MESSAGE_MODELS_PATH",
    "Adapter",
    "MappingAdapter",
    "adapt_R11_R12",
    "adapt_R11_R14",
    "add_par_data",
    "aggregate_codes",
    "as_codes",
    "broadcast",
    "cached",
    "check_support",
    "convert_units",
    "copy_column",
    "datetime_now_with_tz",
    "eval_anno",
    "ffill",
    "identify_nodes",
    "iter_keys",
    "load_package_data",
    "load_private_data",
    "local_data_path",
    "make_io",
    "make_matched_dfs",
    "make_source_tech",
    "mark_time",
    "maybe_query",
    "merge_data",
    "minimum_version",
    "package_data_path",
    "path_fallback",
    "preserve_log_level",
    "private_data_path",
    "replace_par_data",
    "same_node",
    "same_time",
    "series_of_pint_quantity",
    "show_versions",
    "silence_log",
    "strip_par_data",
]

log = logging.getLogger(__name__)


[docs] def add_par_data( scenario: message_ix.Scenario, data: "ParameterData", dry_run: bool = False ) -> int: """Add `data` to `scenario`. Parameters ---------- data Any mapping with keys that are valid :mod:`message_ix` parameter names, and values that are pd.DataFrame or other arguments valid for :meth:`message_ix.Scenario.add_par`. dry_run : optional Only show what would be done. See also -------- strip_par_data """ # TODO optionally add units automatically # TODO allow units column entries to be pint.Unit objects total = 0 for par_name, values in data.items(): N = values.shape[0] log.info(f"{N} rows in {repr(par_name)}") log.debug("\n" + values.to_string(max_rows=5)) total += N if dry_run: continue # Work around iiasa/ixmp#425 values["unit"] = values["unit"].str.replace("^$", "-", regex=True) try: scenario.add_par(par_name, values) except Exception: # pragma: no cover print(values.head()) raise return total
[docs] def aggregate_codes(df: pd.DataFrame, dim: str, codes): # pragma: no cover """Aggregate `df` along dimension `dim` according to `codes`.""" raise NotImplementedError # Construct an inverse mapping mapping = {} for code in codes: mapping.update({child.id: code.id for child in code.child}) for key, group_series in df.groupby(dim): print(key, group_series.replace({dim: mapping}))
[docs] def broadcast( df: pd.DataFrame, labels: Optional[pd.DataFrame] = None, **kwargs ) -> pd.DataFrame: """Fill missing data in `df` by broadcasting. :func:`broadcast` is suitable for use with partly-filled data frames returned by :func:`.message_ix.util.make_df`, with 1 column per dimension, plus a "value" column. It is also usable with :meth:`pandas.DataFrame.pipe` for chained operations. `labels` (if any) are handled first: one copy or duplicate of `df` is produced for each row (set of labels) in this argument. Then, `kwargs` are handled; :func:`broadcast` returns one copy for each element in the cartesian product of the dimension labels given by `kwargs`. Parameters ---------- labels : pandas.DataFrame Each column (dimension) corresponds to one in `df`. Each row represents one matched set of labels for those dimensions. kwargs Keys are dimensions. Values are labels along that dimension to fill. Returns ------- pandas.DataFrame The length is either 1 or an integer multiple of the length of `df`. Raises ------ ValueError if any of the columns in `labels` or `kwargs` are not present in `df`, or if those columns are present but not empty. Examples -------- >>> from message_ix import make_df >>> from message_ix_models.util import broadcast # Create a base data frame with some empty columns >>> base = make_df("input", technology="t", value=[1.1, 2.2]) # Broadcast (duplicate) the data across 2 dimensions >>> df = base.pipe(broadcast, node_loc=["node A", "node B"], mode=["m0", "m1"]) # Show part of the result >>> df.dropna(axis=1) mode node_loc technology value 0 m0 node A t 1.1 1 m0 node A t 2.2 2 m0 node B t 1.1 3 m0 node B t 2.2 4 m1 node A t 1.1 5 m1 node A t 2.2 6 m1 node B t 1.1 7 m1 node B t 2.2 """ def _check_dim(d): try: if not df[d].isna().all(): raise ValueError(f"Dimension {d} was not empty\n\n{df.head()}") except KeyError: raise ValueError(f"Dimension {d} not among {list(df.columns)}") # Broadcast using matched labels for 1+ dimensions from a data frame if labels is not None: # Check the dimensions for dim in labels.columns: _check_dim(dim) # Concatenate 1 copy of `df` for each row in `labels` df = pd.concat( [df.assign(**row) for _, row in labels.iterrows()], ignore_index=True, sort=False, ) # Next, broadcast other dimensions given as keyword arguments for dim, levels in kwargs.items(): _check_dim(dim) if len(levels) == 0: log.debug( f"Don't broadcast over {repr(dim)}; labels {levels} have length 0" ) continue # - Duplicate the data # - Drop the existing column named 'dim' # - Re-add the column from the constructed MultiIndex # - Reindex for sequential row numbers df = ( pd.concat([df] * len(levels), keys=levels, names=[dim], sort=False) .drop(dim, axis=1) .reset_index(dim) .reset_index(drop=True) ) return df
[docs] def check_support(context, settings=dict(), desc: str = "") -> None: """Check whether a Context is compatible with certain `settings`. Raises ------ :class:`NotImplementedError` if any `context` value for a key of `settings` is not among the values in `settings`. :class:`KeyError` if the key is not set on `context` at all. See also -------- :ref:`check-support` """ __tracebackhide__ = True for key, values in settings.items(): if context[key] not in values: raise NotImplementedError( f"{desc} for {repr(values)}; got {repr(context[key])}" )
[docs] def copy_column(column_name): """For use with :meth:`pandas.DataFrame.assign`. Examples -------- Modify `df` by filling the column 'baz' with the value ``3``, and copying the column 'bar' into column 'foo'. >>> df.assign(foo=copy_column('bar'), baz=3) Note that a similar assignment can be achieved with :meth:`~pandas.DataFrame.eval`: >>> df.eval("foo = bar") :func:`copy_column` is useful in the context of more complicated calls to :meth:`~pandas.DataFrame.assign`. """ return lambda df: df[column_name]
[docs] def datetime_now_with_tz() -> datetime: """Current date and time with time zone information.""" tz = datetime.now().astimezone().tzinfo return datetime.now(tz)
[docs] def ffill( df: pd.DataFrame, dim: str, values: Sequence[CodeLike], expr: Optional[str] = None ) -> pd.DataFrame: """Forward-fill `df` on `dim` to cover `values`. Parameters ---------- df : pandas.DataFrame Data to fill forwards. dim : str Dimension to fill along. Must be a column in `df`. values : list of str Labels along `dim` that must be present in the returned data frame. expr : str, optional If provided, :meth:`.DataFrame.eval` is called. This can be used to assign one column to another. For instance, if `dim` == "year_vtg" and `expr` is "year_act = year_vtg", then forward filling is performed along the "year_vtg" dimension/ column, and then the filled values are copied to the "year_act" column. """ if dim in ("value", "unit"): raise ValueError(dim) # Mapping from (values existing in `df`) -> equal or greater members of `values` mapping = defaultdict(set) last_seen = None for v in sorted(set(values) | set(df[dim].unique())): if v in df[dim].unique(): last_seen = v mapping[last_seen].add(v) def _maybe_eval(df): return df.eval(expr) if expr is not None else df dfs = [df] for key, group_df in df.groupby(dim): for new_label in sorted(mapping[key])[1:]: # Duplicate the data; assign the new_label to `dim` dfs.append(group_df.assign(**{dim: new_label}).pipe(_maybe_eval)) return pd.concat(dfs, ignore_index=True)
class KeyIterator(Protocol): def __call__(self) -> "genno.Key": ...
[docs] def iter_keys(base: "genno.Key") -> KeyIterator: """Return an iterator over a sequence of keys starting with `base_key`. This can be used for shorthand when constructing sequences of :mod:`genno` computations. Example ------- >>> base_key = genno.Key("foo:a-b-c") >>> k = iter_keys(base_key) >>> k() <foo:a-b-c:0> >>> k() <foo:a-b-c:1> >>> k() <foo:a-b-c:2> """ return partial(next, map(lambda i: base + str(i), count()))
def iter_parameters(set_name, scenario: Optional["message_ix.Scenario"] = None): """Iterate over MESSAGEix parameters with *set_name* as a dimension. .. deprecated:: 2023.11 Use :meth:`ixmp.Scenario.par_list` with the :py:`indexed_by=...` argument instead. """ try: assert scenario is not None yield from scenario.items(indexed_by=set_name, par_data=False) except (TypeError, AssertionError): # ixmp < 3.8.0 import message_ix.models for name, info in message_ix.models.MESSAGE_ITEMS.items(): if info["ix_type"] == "par" and set_name in info["idx_sets"]: yield name
[docs] def make_io( src: tuple[str, str, str], dest: tuple[str, str, str], efficiency: float, on: Literal["input", "output"] = "input", **kwargs, ): """Return input and output data frames for a 1-to-1 technology. Parameters ---------- src : tuple of str Input (commodity, level, unit) dest : tuple of str Output (commodity, level, unit) efficiency : float Conversion efficiency. on : 'input' or 'output' If 'input', `efficiency` applies to the input, and the output, thus the activity level of the technology, is in dest[2] units. If 'output', the opposite. kwargs Passed to :func:`.make_df`. Returns ------- dict (str -> pd.DataFrame) Keys are 'input' and 'output'; values are data frames. """ return dict( input=message_ix.make_df( "input", commodity=src[0], level=src[1], unit=src[2], value=efficiency if on == "input" else 1.0, **kwargs, ), output=message_ix.make_df( "output", commodity=dest[0], level=dest[1], unit=dest[2], value=1.0 if on == "input" else efficiency, **kwargs, ), )
[docs] def make_matched_dfs( base: Union[MutableMapping, pd.DataFrame], **par_value: Union[float, pint.Quantity, dict], ) -> "MutableParameterData": """Return data frames derived from `base` for multiple parameters. Creates one data frame per keyword argument. Parameters ---------- base : pandas.DataFrame, dict, etc. Used to populate other columns of each data frame. Duplicates—which occur when the target parameter has fewer dimensions than `base`—are dropped. par_values : Argument names (e.g. ‘fix_cost’) are passed to :func:`.make_df`. If the value is :class:`float`, it overwrites the "value" column; if :class:`pint.Quantity`, its magnitude overwrites "value" and its units the "units" column, as a formatted string. Returns ------- :class:`dict` of :class:`pandas.DataFrame` one for each parameter in `par_values`. Examples -------- >>> input = make_df("input", ...) >>> cf_tl = make_matched_dfs( >>> input, >>> capacity_factor=1, >>> technical_lifetime=pint.Quantity(8, "year"), >>> ) """ replace = dict() data = ChainMap(replace, base) result = dict() for par, values in par_value.items(): replace.clear() if isinstance(values, dict): replace.update(values) value = replace.pop("value") else: value = values if isinstance(value, pint.Quantity): replace["value"] = value.magnitude replace["unit"] = f"{value.units:~}" else: replace["value"] = value result[par] = ( message_ix.make_df(par, **data).drop_duplicates().reset_index(drop=True) ) return result
[docs] def make_source_tech( info: Union[message_ix.Scenario, ScenarioInfo], common, **values ) -> "MutableParameterData": """Return parameter data for a ‘source’ technology. The technology has no inputs; its output commodity and/or level are determined by `common`; either single values, or :obj:`None` if the result will be :meth:`~pandas.DataFrame.pipe`'d through :func:`broadcast`. Parameters ---------- info : .Scenario or .ScenarioInfo common : dict Passed to :func:`.make_df`. **values Values for 'capacity_factor' (optional; default 1.0), 'output', 'var_cost', and optionally 'technical_lifetime'. Returns ------- dict Suitable for :func:`add_par_data`. """ # Check arguments if isinstance(info, message_ix.Scenario): info = ScenarioInfo(info) values.setdefault("capacity_factor", 1.0) missing = {"capacity_factor", "output", "var_cost"} - set(values.keys()) if len(missing): raise ValueError(f"make_source_tech() needs values for {repr(missing)}") elif "technical_lifetime" not in values: log.debug("No technical_lifetime for source technology") # Create data for "output" result = dict( output=message_ix.make_df( "output", value=values.pop("output"), year_act=info.Y, year_vtg=info.Y, **common, ) .pipe(broadcast, node_loc=nodes_ex_world(info.N)) .pipe(same_node) ) # Add data for other parameters result.update(make_matched_dfs(base=result["output"], **values)) return result
[docs] def maybe_query(series: pd.Series, query: Optional[str]) -> pd.Series: """Apply :meth:`pandas.DataFrame.query` if the `query` arg is not :obj:`None`. :meth:`~pandas.DataFrame.query` is not chainable (`pandas-dev/pandas#37941 <https://github.com/pandas-dev/pandas/issues/37941>`_). Use this function with :meth:`pandas.Series.pipe`, passing an argument that may be :obj:`None`, to have a chainable query operation that can be a no-op. """ # Convert Series to DataFrame, query(), then retrieve the single column return series if query is None else series.to_frame().query(query)[0]
[docs] def merge_data(base: "MutableParameterData", *others: "ParameterData") -> None: """Merge dictionaries of DataFrames together into `base`.""" for other in others: for par, df in other.items(): base[par] = pd.concat([base.get(par, None), df])
[docs] def minimum_version( expr: str, raises: Optional[Iterable[type[Exception]]] = None ) -> Callable: """Decorator for functions that require a minimum version of some upstream package. If the decorated function is called and the condition in `expr` is not met, :class:`.NotImplementedError` is raised with an informative message. The decorated function gains an attribute :py:`.minimum_version`, a pytest MarkDecorator that can be used on associated test code. This marks the test as XFAIL, raising :class:`.NotImplementedError` (directly); :class:`.RuntimeError` or :class:`.AssertionError` (for instance, via :mod:`.click` test utilities), or any of the classes given in the `raises` argument. See :func:`.prepare_reporter` / :func:`.test_prepare_reporter` for a usage example. Parameters ---------- expr : Like "example 1.2.3.post0". The condition for the decorated function is that the installed version must be equal to or greater than this version. """ from packaging.version import parse package, v_min = expr.split(" ") v_package = version(package) condition = parse(v_package) < parse(v_min) message = f" with {package} {v_package} < {v_min}" # Create the decorator def decorator(func): name = f"{func.__module__}.{func.__name__}()" # Wrap `func` def wrapper(*args, **kwargs): if condition: raise NotImplementedError(f"{name}{message}.") return func(*args, **kwargs) update_wrapper(wrapper, func) try: import pytest # Create a MarkDecorator and store as an attribute of "wrapper" setattr( wrapper, "minimum_version", pytest.mark.xfail( condition=condition, raises=( NotImplementedError, # Raised directly, above AssertionError, # e.g. through CliRunner.assert_exit_0() RuntimeError, # e.g. through genno.Computer ) + tuple(raises or ()), # Other exception classes reason=f"Not supported{message}", ), ) except ImportError: pass # Pytest not present; testing is not happening return wrapper return decorator
[docs] def path_fallback( *parts: Union[str, Path], where: Union[str, list[Union[str, Path]]] = "", ) -> Path: """Locate a path constructed from `parts` found in the first of several directories. This allows to implement ‘fallback’ behaviour in which files or directories in certain locations are used preferentially. Parameters ---------- parts : Path parts or fragments such as directory names and a final file name. where : Either: - :class:`str` containing one or more of: - "cache": locate `parts` in the :mod:`message_ix_models` cache directory. - "local": locate `parts in the user's local data directory (same as :func:`local_data_path`). - "package": locate `parts` in :mod:`message_ix_models` package data (same as :func:`.package_data_path`). - "private": locate `parts` in the :mod:`message_data` :file:`/data/` directory (same as :func:`.private_data_path`). - "test": locate test data in :py:`package_data_path("test", ...)` - :class:`list` where each element is :class:`str` (one of the above) or a :class:`pathlib.Path`. Returns ------- pathlib.Path The first of the locations indicated by `where` in which the file or directory `parts` exists. Raises ------ ValueError If `where` is empty or `parts` are not found in any of the indicated locations. """ dirs = [] for item in where.split() if isinstance(where, str) else where: if isinstance(item, str): if item == "cache": dirs.append(user_cache_path("message-ix-models")) elif item == "local": dirs.append(local_data_path()) elif item == "package": dirs.append(package_data_path()) elif item == "private": dirs.append(private_data_path()) elif item == "test": dirs.append(package_data_path("test")) else: dirs.append(item) for path in [d.joinpath(*parts) for d in dirs]: if not path.exists(): log.debug(f"Not found: {path}") continue return path if not dirs: raise ValueError(f"No directories identified among {where!r}") else: raise ValueError(f"'{Path(*parts)!s}' not found in any of {dirs}")
[docs] def replace_par_data( scenario: message_ix.Scenario, parameters: Union[str, Sequence[str]], filters: Mapping[str, Union[str, int, Collection[str], Collection[int]]], to_replace: Mapping[str, Union[Mapping[str, str], Mapping[int, int]]], ) -> None: """Replace data in `parameters` of `scenario`. Parameters ---------- scenario Scenario in which to replace data. parameters : str or sequence of str Name(s) of parameters in which to replace data. filters Passed to :meth:`.Scenario.par` argument of the same name. to_replace Passed to :meth:`pandas.DataFrame.replace` argument of the same name. Examples -------- Replace data in the "relation_activity" parameter for a particular technology and relation: assign the same values as entries in a different relation name for the same technology. >>> replace_par_data( ... scenario, ... "relation_activity", ... dict(technology="hp_gas_i", relation="CO2_r_c"), ... dict(relation={"CO2_r_c": "CO2_ind"}), ... ) """ from message_ix_models.model.build import apply_spec pars = parameters.split() if isinstance(parameters, str) else parameters # Create a Spec that requires `scenario` to have all the set elements mentioned by # `filters` and/or `replacements` s = Spec() for k, v in filters.items(): s.require.set[k].extend([v] if isinstance(v, (str, int)) else v) for k, v in to_replace.items(): s.require.set[k].extend(v.keys()) s.require.set[k].extend(v.values()) # Use apply_spec() simply to check that `scenario` contains the expected items, # before attempting to modify data apply_spec(scenario, s) msg = f"Replace {filters!r} with {to_replace!r}" log.info(msg) for par_name in pars: with scenario.transact(f"{msg} in {par_name!r}"): # Base data, to be replaced to_remove = scenario.par(par_name, filters=filters) # Remove the base data scenario.remove_par(par_name, to_remove.drop(columns=["value", "unit"])) # Add the modified data scenario.add_par(par_name, to_remove.replace(to_replace)) log.info(f"{len(to_remove)} obs in {par_name!r}")
[docs] @singledispatch def same_node(data: pd.DataFrame, from_col: str = "node_loc") -> pd.DataFrame: """Fill 'node_{,dest,loc,origin,rel,share}' in `df` from `from_col`.""" cols = list( set(data.columns) & ({"node", "node_loc", "node_origin", "node_dest", "node_rel", "node_share"}) - {from_col} ) return data.assign(**{c: copy_column(from_col) for c in cols})
@same_node.register(dict) def _( data: "MutableParameterData", from_col: str = "node_loc" ) -> "MutableParameterData": for key, df in data.items(): data[key] = same_node(df, from_col=from_col) return data
[docs] @singledispatch def same_time(data: pd.DataFrame) -> pd.DataFrame: """Fill 'time_origin'/'time_dest' in `df` from 'time'.""" cols = list(set(data.columns) & {"time_origin", "time_dest"}) return data.assign(**{c: copy_column("time") for c in cols})
@same_time.register(dict) def _(data: "MutableParameterData") -> "MutableParameterData": for key, df in data.items(): data[key] = same_time(df) return data
[docs] def show_versions() -> str: """Output of :func:`ixmp.show_versions`, as a :class:`str`.""" from io import StringIO from . import ixmp from ._logging import preserve_log_handlers # Retrieve package versions buf = StringIO() # show_versions() imports pyam-iamc, which in turn imports ixmp4, which removes all # handlers from the root logger (?!). Preserve the message-ix-models logging config. with preserve_log_handlers(): ixmp.show_versions(buf) return buf.getvalue()
# FIXME Reduce complexity from 14 to ≤13
[docs] def strip_par_data( # noqa: C901 scenario: message_ix.Scenario, set_name: str, element: str, dry_run: bool = False, dump: Optional["MutableParameterData"] = None, ) -> int: """Remove `element` from `set_name` in scenario, optionally dumping to `dump`. Parameters ---------- dry_run : bool, optional If :data:`True`, only show what would be done. dump : dict, optional If provided, stripped data are stored in this dictionary. Otherwise, they are discarded. Returns ------- int Total number of rows removed across all parameters. See also -------- add_par_data """ par_list = scenario.par_list() no_data = set() # Names of parameters with no data being stripped total = 0 # Total observations stripped if dump is None: pars = [] # Don't iterate over parameters unless dumping else: log.info( f"Remove data with {set_name}={element!r}" + (" (DRY RUN)" if dry_run else "") ) # Iterate over parameters with ≥1 dimensions indexed by `set_name` pars = iter_parameters(set_name, scenario=scenario) for par_name in pars: if par_name not in par_list: # pragma: no cover log.warning( f" MESSAGEix parameter {par_name!r} missing in Scenario {scenario.url}" ) continue # Iterate over dimensions indexed by `set_name` for dim, _ in filter( lambda item: item[1] == set_name, zip(scenario.idx_names(par_name), scenario.idx_sets(par_name)), ): # Check for contents of par_name that include `element` par_data = scenario.par(par_name, filters={dim: element}) N = len(par_data) total += N if N == 0: # No data; no need to do anything further no_data.add(par_name) continue elif dump is not None: dump[par_name] = pd.concat( [dump.get(par_name, pd.DataFrame()), par_data] ) log.info(f" {N} rows in {par_name!r}") # Show some debug info for col in filter( lambda c: c != set_name and c in par_data.columns, ("commodity", "level", "technology"), ): log.info(f" with {col}={sorted(par_data[col].unique())}") if dry_run: continue # Actually remove the data scenario.remove_par(par_name, key=par_data) # NB would prefer to do the following, but raises an exception: # scenario.remove_par(par_name, key={set_name: [value]}) if not dry_run and dump is not None: log.info(f" {total} rows total") if no_data: log.debug(f"No data removed from {len(no_data)} other parameters") if not dry_run: log.info(f"Remove {element!r} from set {set_name!r}") try: scenario.remove_set(set_name, element) except Exception as e: if "does not have an element" in str(e): log.info(" …not found") else: # pragma: no cover raise return total