Source code for message_ix_models.util.sdmx

"""Utilities for handling objects from :mod:`sdmx`."""

import logging
from collections.abc import Mapping
from datetime import datetime
from enum import Enum, Flag
from importlib.metadata import version
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union
from warnings import warn

import sdmx
import sdmx.message
from iam_units import registry
from sdmx.model.v21 import AnnotableArtefact, Annotation, Code, InternationalString

from .common import package_data_path

if TYPE_CHECKING:
    from os import PathLike

    import sdmx.model.common

log = logging.getLogger(__name__)

CodeLike = Union[str, Code]


# FIXME Reduce complexity from 13 → ≤11
[docs] def as_codes( # noqa: C901 data: Union[list[str], dict[str, CodeLike]], ) -> list[Code]: """Convert `data` to a :class:`list` of :class:`.Code` objects. Various inputs are accepted: - :class:`list` of :class:`str`. - :class:`dict`, in which keys are :attr:`~sdmx.model.common.Code.id` and values are further :class:`dict` with keys matching other Code attributes. """ # Assemble results as a dictionary result: dict[str, Code] = {} if isinstance(data, list): # FIXME typing ignored temporarily for PR#9 data = dict(zip(data, data)) # type: ignore [arg-type] elif not isinstance(data, Mapping): raise TypeError(data) for id, info in data.items(): # Pass through Code; convert other types to dict() if isinstance(info, Code): result[info.id] = info continue elif isinstance(info, str): _info = dict(name=info) elif isinstance(info, Mapping): _info = dict(info) else: raise TypeError(info) # Create a Code object code = Code( id=str(id), name=_info.pop("name", str(id).title()), ) # Store the description, if any try: code.description = InternationalString(value=_info.pop("description")) except KeyError: pass # Associate with a parent try: parent_id = _info.pop("parent") except KeyError: pass # No parent else: result[parent_id].append_child(code) # Associate with any children for id in _info.pop("child", []): try: code.append_child(result[id]) except KeyError: pass # Not parsed yet # Convert other dictionary (key, value) pairs to annotations for id, value in _info.items(): code.annotations.append( Annotation(id=id, text=value if isinstance(value, str) else repr(value)) ) result[code.id] = code return list(result.values())
[docs] def eval_anno(obj: AnnotableArtefact, id: str): """Retrieve the annotation `id` from `obj`, run :func:`eval` on its contents. .. deprecated:: 2023.9.12 Use :meth:`sdmx.model.common.AnnotableArtefact.eval_annotation`, which provides the same functionality. """ warn( "message_ix_models.util.eval_anno; use sdmx.model.common.AnnotableArtefact" ".eval_annotation() instead.", DeprecationWarning, stacklevel=2, ) try: value = str(obj.get_annotation(id=id).text) except KeyError: # No such attribute return None try: return eval(value, {"registry": registry}) except Exception as e: # Something that can't be eval()'d, e.g. a plain string log.debug(f"Could not eval({value!r}): {e}") return value
[docs] class URNLookupEnum(Enum): """:class:`.Enum` subclass that allows looking up members using a URN.""" _urn_name: dict[str, str]
[docs] @classmethod def by_urn(cls, urn: str): """Return the :class:`.Enum` member given its `urn`.""" return cls[cls._urn_name[urn]]
[docs] def make_enum(urn, base=URNLookupEnum): """Create an :class:`.enum.Enum` (or `base`) with members from codelist `urn`.""" # Read the code list cl = read(urn) # Ensure the 0 member is NONE, not any of the codes names = ["NONE"] if issubclass(base, Flag) else [] names.extend(code.id for code in cl) # Create the class result = base(urn, names) if issubclass(base, URNLookupEnum): # Populate the URN → member name mapping result._urn_name = {code.urn: code.id for code in cl} return result
[docs] def read(urn: str, base_dir: Optional["PathLike"] = None): """Read SDMX object from package data given its `urn`.""" # Identify a path that matches `urn` base_dir = Path(base_dir or package_data_path("sdmx")) urn = urn.replace(":", "_") # ":" invalid on Windows paths = sorted( set(base_dir.glob(f"*{urn}*.xml")) | set(base_dir.glob(f"*{urn.upper()}*.xml")) ) if len(paths) > 1: log.info( f"Match {paths[0].relative_to(base_dir)} for {urn!r}; {len(paths) -1 } " "other result(s)" ) try: with open(paths[0], "rb") as f: msg = sdmx.read_sdmx(f) except IndexError: raise FileNotFoundError(f"'*{urn}*.xml', '*{urn.upper()}*.xml' or similar") for _, cls in msg.iter_collections(): try: return next(iter(msg.objects(cls).values())) except StopIteration: pass
[docs] def write(obj, base_dir: Optional["PathLike"] = None, basename: Optional[str] = None): """Store an SDMX object as package data.""" base_dir = Path(base_dir or package_data_path("sdmx")) if isinstance(obj, sdmx.message.StructureMessage): msg = obj assert basename else: # Set the URN of the object obj.urn = sdmx.urn.make(obj) # Wrap the object in a StructureMessage msg = sdmx.message.StructureMessage() msg.add(obj) # Identify a path to write the file. ":" is invalid on Windows. basename = basename or obj.urn.split("=")[-1].replace(":", "_") msg.header = sdmx.message.Header( source=f"Generated by message_ix_models {version('message_ix_models')}", prepared=datetime.now(), ) path = base_dir.joinpath(f"{basename}.xml") # Write path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(sdmx.to_xml(msg, pretty_print=True)) log.info(f"Wrote {path}")
[docs] def register_agency( agency: "sdmx.model.common.Agency", ) -> "sdmx.model.common.AgencyScheme": """Add `agency` to the :class:`.AgencyScheme` "IIASA_ECE:AGENCIES".""" # Read the existing agency scheme as_ = read("IIASA_ECE:AGENCIES") if agency in as_: log.info(f"Replace or update existing {as_[agency.id]!r}") as_.items[agency.id] = agency else: as_.append(agency) log.info(f"Updated {as_!r}") # Write to file again write(as_) return as_