Source code for message_ix_models.util.sdmx

"""Utilities for handling objects from :mod:`sdmx`."""

import logging
from import Mapping
from dataclasses import dataclass, fields
from datetime import datetime
from enum import Enum, Flag
from importlib.metadata import version
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union, cast
from warnings import warn

import sdmx
import sdmx.message
from iam_units import registry
from sdmx.model import common

from .common import package_data_path

    from os import PathLike
    from typing import TypeVar

    from sdmx.message import StructureMessage

    # TODO Use "from typing import Self" once Python 3.11 is the minimum supported
    Self = TypeVar("Self", bound="AnnotationsMixIn")

log = logging.getLogger(__name__)

CodeLike = Union[str, common.Code]

[docs] @dataclass class AnnotationsMixIn: """Mix-in for dataclasses to allow (de)serializing as SDMX annotations.""" # TODO Type with overrides: list → list
[docs] def get_annotations(self, _rtype: Union[type[list], type[dict]]): """Return a collection of :class:`.Annotation` for the fields of the object. Returns ------- list of :class:`Annotation <sdmx.model.common.Annotation>` if `_rtype` is :class:`list`. dict if `_rtype` is :class:`dict`. The dict has the one key "annotations", mapped to a :class:`list` of Annotations. This can be used as a keyword argument to the constructor of a :class:`.AnnotableArtefact` subclass. """ result = [] for f in fields(self): anno_id ="_", "-") result.append( common.Annotation(id=anno_id, text=repr(getattr(self, ) if _rtype is list: return result else: return dict(annotations=result)
[docs] @classmethod def from_obj(cls: type["Self"], obj: common.AnnotableArtefact) -> "Self": """Return a new instance of `cls` given an AnnotableArtefact `obj`.""" args = [] for f in fields(cls): anno_id ="_", "-") args.append(obj.eval_annotation(id=anno_id)) return cls(*args)
# FIXME Reduce complexity from 13 → ≤11
[docs] def as_codes( # noqa: C901 data: Union[list[str], dict[str, CodeLike]], ) -> list[common.Code]: """Convert `data` to a :class:`list` of :class:`.Code` objects. Various inputs are accepted: - :class:`list` of :class:`str`. - :class:`dict`, in which keys are :attr:`` and values are further :class:`dict` with keys matching other Code attributes. """ # Assemble results as a dictionary result: dict[str, common.Code] = {} if isinstance(data, list): # FIXME typing ignored temporarily for PR#9 data = dict(zip(data, data)) # type: ignore [arg-type] elif not isinstance(data, Mapping): raise TypeError(data) for id, info in data.items(): # Pass through Code; convert other types to dict() if isinstance(info, common.Code): result[] = info continue elif isinstance(info, str): _info = dict(name=info) elif isinstance(info, Mapping): _info = dict(info) else: raise TypeError(info) # Create a Code object code = common.Code( id=str(id), name=_info.pop("name", str(id).title()), ) # Store the description, if any try: code.description = common.InternationalString( value=_info.pop("description") ) except KeyError: pass # Associate with a parent try: parent_id = _info.pop("parent") except KeyError: pass # No parent else: result[parent_id].append_child(code) # Associate with any children for id in _info.pop("child", []): try: code.append_child(result[id]) except KeyError: pass # Not parsed yet # Convert other dictionary (key, value) pairs to annotations for id, value in _info.items(): code.annotations.append( common.Annotation( id=id, text=value if isinstance(value, str) else repr(value) ) ) result[] = code return list(result.values())
[docs] def eval_anno(obj: common.AnnotableArtefact, id: str): """Retrieve the annotation `id` from `obj`, run :func:`eval` on its contents. .. deprecated:: 2023.9.12 Use :meth:`sdmx.model.common.AnnotableArtefact.eval_annotation`, which provides the same functionality. """ warn( "message_ix_models.util.eval_anno; use sdmx.model.common.AnnotableArtefact" ".eval_annotation() instead.", DeprecationWarning, stacklevel=2, ) try: value = str(obj.get_annotation(id=id).text) except KeyError: # No such attribute return None try: return eval(value, {"registry": registry}) except Exception as e: # Something that can't be eval()'d, e.g. a plain string log.debug(f"Could not eval({value!r}): {e}") return value
[docs] class URNLookupEnum(Enum): """:class:`.Enum` subclass that allows looking up members using a URN.""" _ignore_ = "_urn_name" _urn_name: dict def __init_subclass__(cls): cls._urn_name = dict()
[docs] @classmethod def by_urn(cls, urn: str): """Return the :class:`.Enum` member given its `urn`.""" return cls[cls.__dict__["_urn_name"][urn]]
[docs] def make_enum(urn, base=URNLookupEnum): """Create an :class:`.enum.Enum` (or `base`) with members from codelist `urn`.""" # Read the code list cl = read(urn) # Ensure the 0 member is NONE, not any of the codes names = ["NONE"] if issubclass(base, Flag) else [] names.extend( for code in cl) # Create the class result = base(urn, names) if issubclass(base, URNLookupEnum): # Populate the URN → member name mapping for code in cl: result._urn_name[code.urn] = return result
[docs] def read(urn: str, base_dir: Optional["PathLike"] = None): """Read SDMX object from package data given its `urn`.""" # Identify a path that matches `urn` base_dir = Path(base_dir or package_data_path("sdmx")) urn = urn.replace(":", "_") # ":" invalid on Windows paths = sorted( set(base_dir.glob(f"*{urn}*.xml")) | set(base_dir.glob(f"*{urn.upper()}*.xml")) ) if len(paths) > 1: f"Match {paths[0].relative_to(base_dir)} for {urn!r}; {len(paths) - 1} " "other result(s)" ) try: with open(paths[0], "rb") as f: msg = cast("StructureMessage", sdmx.read_sdmx(f)) except IndexError: raise FileNotFoundError(f"'*{urn}*.xml', '*{urn.upper()}*.xml' or similar") for _, cls in msg.iter_collections(): try: return next(iter(msg.objects(cls).values())) except StopIteration: pass
[docs] def write(obj, base_dir: Optional["PathLike"] = None, basename: Optional[str] = None): """Store an SDMX object as package data.""" base_dir = Path(base_dir or package_data_path("sdmx")) if isinstance(obj, sdmx.message.StructureMessage): msg = obj assert basename else: # Set the URN of the object obj.urn = sdmx.urn.make(obj) # Wrap the object in a StructureMessage msg = sdmx.message.StructureMessage() msg.add(obj) # Identify a path to write the file. ":" is invalid on Windows. basename = basename or obj.urn.split("=")[-1].replace(":", "_") msg.header = sdmx.message.Header( source=f"Generated by message_ix_models {version('message_ix_models')}",, ) path = base_dir.joinpath(f"{basename}.xml") # Write path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(sdmx.to_xml(msg, pretty_print=True))"Wrote {path}")
[docs] def register_agency(agency: "common.Agency") -> "common.AgencyScheme": """Add `agency` to the :class:`.AgencyScheme` "IIASA_ECE:AGENCIES".""" # Read the existing agency scheme as_ = read("IIASA_ECE:AGENCIES") if agency in as_:"Replace or update existing {as_[]!r}") as_.items[] = agency else: as_.append(agency)"Updated {as_!r}") # Write to file again write(as_) return as_