Source code for message_ix_models.util.genno

"""Utilities for working with :mod:`.genno`.

Most code appearing here **should** be migrated upstream, to genno itself.
"""

from collections.abc import Callable
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from genno import Computer
    from genno.types import AnyQuantity, KeyLike

try:
    from genno.operator import as_quantity
except ImportError:
    # genno < 1.25, e.g. with message_ix/ixmp 3.7.0
    # TODO Remove when support for these upstream versions is dropped

    def as_quantity(info: Union[dict, float, str]) -> "AnyQuantity":
        import genno
        import pandas as pd
        from iam_units import registry

        if isinstance(info, str):
            q = registry.Quantity(info)
            return genno.Quantity(q.magnitude, units=q.units)
        elif isinstance(info, float):
            return genno.Quantity(info)
        elif isinstance(info, dict):
            data = info.copy()
            dim = data.pop("_dim")
            unit = data.pop("_unit")
            return genno.Quantity(pd.Series(data).rename_axis(dim), units=unit)
        else:
            raise TypeError(type(info))


try:
    from genno import Keys
except ImportError:
    # genno < 1.28 with message_ix/ixmp 3.7.0
    # TODO Remove when support for these upstream versions is dropped

    from types import SimpleNamespace

    Keys = SimpleNamespace  # type: ignore [assignment, misc]

__all__ = [
    "Collector",
    "Keys",
    "as_quantity",
]


[docs] class Collector: """Helper class to collect and merge data at a target key. Example usage: .. code-block:: python # Create a Collector instance collector = Collector(target="FOO", key_cb="{}::foo".format) # Associate it with a particular Computer c = collector.computer = Computer() # Add a task collect("bar", func, "k1", "k2", arg1="baz", arg2="qux") These statements have the following effects: 1. Add to `c` a task with the key "FOO" that calls :func:`.merge_data` on 1 or more inputs. 2. Construct a key "bar::foo" using the `key_cb`. 3. Add a task at "bar::foo" that calls :py:`func(k1, k2, arg1="baz", arg2="qux")`. 4. Add "bar::foo" (denoting the output of (3)) to the keys merged by (1). """ __slots__ = ("_computer", "_key_cb", "_target") def __init__( self, target: "KeyLike", key_cb: Callable[["KeyLike"], "KeyLike"] ) -> None: self._target = target self._key_cb = key_cb @property def computer(self) -> "Computer": return self._computer @computer.setter def computer(self, c: "Computer") -> None: from message_ix_models.report.operator import merge_data self._computer = c # Add the computation that merges data for this Collector assert self._target not in self._computer self._computer.graph[self._target] = (merge_data,) def __call__(self, _target_name: str, *args, **kwargs) -> "KeyLike": # Construct a key using the callback key = self._key_cb(_target_name) # Add a computation at `key` using the `args` and `kwargs` c = self._computer c.add(key, *args, **kwargs) # Extend the keys to be collected with `key` c.graph[self._target] = c.graph[self._target] + (key,) return key