Source code for message_ix_models.util.node

"""Utilities for nodes."""

import logging
from collections.abc import Sequence
from typing import Union

from message_ix import Scenario
from sdmx.model.v21 import Code

from .common import Adapter, MappingAdapter  # noqa: F401

log = logging.getLogger(__name__)

#: Names of dimensions indexed by 'node'.
#:
#: .. todo:: to be robust to changes in :mod:`message_ix`, read these names from that
#:    package.
NODE_DIMS = [
    "n",
    "node",
    "node_loc",
    "node_origin",
    "node_dest",
    "node_rel",
    "node_share",
]

#: Mapping from R11 to R12 node IDs.
R11_R12 = (
    ("R11_AFR", "R12_AFR"),
    ("R11_CPA", "R12_CHN"),
    ("R11_EEU", "R12_EEU"),
    ("R11_FSU", "R12_FSU"),
    ("R11_LAM", "R12_LAM"),
    ("R11_MEA", "R12_MEA"),
    ("R11_NAM", "R12_NAM"),
    ("R11_PAO", "R12_PAO"),
    ("R11_PAS", "R12_PAS"),
    ("R11_CPA", "R12_RCPA"),
    ("R11_SAS", "R12_SAS"),
    ("R11_WEU", "R12_WEU"),
)

#: Mapping from R11 to R14 node IDs.
R11_R14 = (
    ("R11_AFR", "R14_AFR"),
    ("R11_FSU", "R14_CAS"),
    ("R11_CPA", "R14_CPA"),
    ("R11_EEU", "R14_EEU"),
    ("R11_LAM", "R14_LAM"),
    ("R11_MEA", "R14_MEA"),
    ("R11_NAM", "R14_NAM"),
    ("R11_PAO", "R14_PAO"),
    ("R11_PAS", "R14_PAS"),
    ("R11_FSU", "R14_RUS"),
    ("R11_SAS", "R14_SAS"),
    ("R11_FSU", "R14_SCS"),
    ("R11_FSU", "R14_UBM"),
    ("R11_WEU", "R14_WEU"),
)

#: Adapt data from the R11 to the R14 node list.
#:
#: The data is adapted using the mappings in :data:`R11_R12` for each of the dimensions
#: in :data:`NODE_DIMS`.
adapt_R11_R12 = MappingAdapter({d: R11_R12 for d in NODE_DIMS})

#: Adapt data from the R11 to the R14 node list.
#:
#: The data is adapted using the mappings in :data:`R11_R14` for each of the dimensions
#: in :data:`NODE_DIMS`.
adapt_R11_R14 = MappingAdapter({d: R11_R14 for d in NODE_DIMS})


[docs]def identify_nodes(scenario: Scenario) -> str: """Return the ID of a node codelist given the contents of `scenario`. Returns ------- str The ID of the :doc:`/pkg-data/node` containing the regions of `scenario`. Raises ------ ValueError if no codelist can be identified, or the nodes in the scenario do not match the children of the “World” node in the codelist. """ from message_ix_models.model.structure import get_codes nodes = sorted(scenario.set("node")) # Candidate ID: split e.g. "R14_AFR" to "R14" id = nodes[0].split("_")[0] try: # Get the corresponding codelist codes = get_codes(f"node/{id}") except FileNotFoundError: raise ValueError(f"Couldn't identify node codelist from {repr(nodes)}") glb_node = [n.endswith("_GLB") for n in nodes] if any(glb_node): omit = nodes.pop(glb_node.index(True)) log.info(f"Omit known, non-standard node '{omit}' from set to match") # Expected list of nodes world = codes[codes.index("World")] # type: ignore [arg-type] codes = [world] + world.child try: assert set(nodes) == set(map(str, codes)) except AssertionError: raise ValueError( "\n".join( [ f"Node IDs suggest codelist {repr(id)}, values do not match:", repr(nodes), repr(codes), ] ) ) else: log.info(f"Identified node codelist {repr(id)}") return id
[docs]def nodes_ex_world(nodes: Sequence[Union[str, Code]]) -> list[Union[str, Code]]: """Exclude "World" and anything containing "GLB" from `nodes`. May also be used as a genno (reporting) operator. """ return list(filter(lambda n_: "GLB" not in n_ and n_ != "World", nodes))