Source code for message_ix_models.util.genno

"""Utilities for working with :mod:`.genno`.

Most code appearing here **should** be migrated upstream, to genno itself.
"""

from collections.abc import Callable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from genno import Computer
    from genno.types import KeyLike


try:
    from genno import Keys
except ImportError:
    # genno < 1.28 with message_ix/ixmp 3.7.0
    # TODO Remove when support for these upstream versions is dropped

    from types import SimpleNamespace

    Keys = SimpleNamespace  # type: ignore [assignment, misc]

__all__ = [
    "Collector",
    "Keys",
    "update_computer",
]


[docs] class Collector: """Helper class to collect and merge data at a target key. Example usage: .. code-block:: python # Create a Collector instance collector = Collector(target="FOO", key_cb="{}::foo".format) # Associate it with a particular Computer c = collector.computer = Computer() # Add a task collect("bar", func, "k1", "k2", arg1="baz", arg2="qux") These statements have the following effects: 1. Add to `c` a task with the key "FOO" that calls :func:`.merge_data` on 1 or more inputs. 2. Construct a key "bar::foo" using the `key_cb`. 3. Add a task at "bar::foo" that calls :py:`func(k1, k2, arg1="baz", arg2="qux")`. 4. Add "bar::foo" (denoting the output of (3)) to the keys merged by (1). """ __slots__ = ("_computer", "_key_cb", "_target") def __init__( self, target: "KeyLike", key_cb: Callable[["KeyLike"], "KeyLike"] ) -> None: self._target = target self._key_cb = key_cb @property def computer(self) -> "Computer": return self._computer @computer.setter def computer(self, c: "Computer") -> None: from message_ix_models.report.operator import merge_data self._computer = c # Add the computation that merges data for this Collector assert self._target not in self._computer self._computer.graph[self._target] = (merge_data,) def __call__(self, _target_name: str, *args, **kwargs) -> "KeyLike": # Construct a key using the callback key = self._key_cb(_target_name) # Add a computation at `key` using the `args` and `kwargs` c = self._computer c.add(key, *args, **kwargs) # Extend the keys to be collected with `key` c.graph[self._target] = c.graph[self._target] + (key,) return key
[docs] def update_computer(a: "Computer", b: "Computer") -> None: """Update `a` with keys and tasks from `b`. For most keys, the task in `b` is copied to `a` at the same key. For the key "config", the contents of the :class:`dict` in `a` are updated with the values from the one in `b`. This overwrites or replaces existing configuration. .. todo:: Migrate upstream to a method like :py:`genno.Computer.update`. Raises ------ RuntimeError - if any key already exists in `a` with a task different from the corresponding one in `b`. - if the key "context" maps to different :class:`Context` instances in `a` and `b`. """ for k, v in b.graph.items(): if k == "context": if a.graph.get(k, v) is not v: raise RuntimeError(f"Existing task {k}{a.graph[k]} is not {v}") elif k == "config": target = a.graph.setdefault(k, dict()) target.update(v) else: if k in a.graph and a.graph[k] != v: raise RuntimeError( f"Existing task {k}{a.graph[k]} would be overwritten by {v}" ) assert k not in a.graph, k a.graph[k] = v