Source code for sdmx.message

"""Classes for SDMX messages.

:class:`Message` and related classes are not defined in the SDMX
:doc:`information model <implementation>`, but in the :ref:`SDMX-ML standard <formats>`.

:mod:`sdmx` also uses :class:`DataMessage` to encapsulate SDMX-JSON data returned by
data sources.
"""

import logging
import re
from collections.abc import Generator
from dataclasses import dataclass, field, fields
from datetime import datetime
from itertools import chain
from operator import attrgetter
from typing import TYPE_CHECKING, Text, get_args

import sdmx.urn
from sdmx import model
from sdmx.compare import Comparable
from sdmx.dictlike import DictLike, summarize_dictlike
from sdmx.dictlike import DictLikeDescriptor as DLD
from sdmx.format import Version
from sdmx.model import common, v21, v30
from sdmx.model.internationalstring import (
    InternationalString,
    InternationalStringDescriptor,
)
from sdmx.util import direct_fields

if TYPE_CHECKING:
    import requests

log = logging.getLogger(__name__)


def _summarize(obj, include: list[str] | None = None):
    """Helper method for __repr__ on Header and Message (sub)classes."""
    import requests

    include = include or list(map(attrgetter("name"), fields(obj)))
    for name in include:
        attr = getattr(obj, name)
        if attr is None:
            continue
        elif isinstance(attr, datetime):
            s_attr = repr(attr.isoformat())
        elif isinstance(attr, requests.Response):
            # Don't use repr(), which displays the entire response body
            s_attr = str(attr)
        else:
            s_attr = repr(attr)

        yield f"{name}: {s_attr}"


@dataclass
class Header:
    """Header of an SDMX-ML message.

    SDMX-JSON messages do not have headers.
    """

    #: (optional) Error code for the message.
    error: Text | None = None
    #: Date and time at which the data was extracted.
    extracted: datetime | None = None
    #: Identifier for the message.
    id: Text | None = None
    #: Date and time at which the message was generated.
    prepared: datetime | None = None
    #: Start of the time period covered by a :class:`.DataMessage`.
    reporting_begin: datetime | None = None
    #: End of the time period covered by a :class:`.DataMessage`.
    reporting_end: datetime | None = None
    #: Intended recipient of the message, e.g. the user's name for an
    #: authenticated service.
    receiver: model.Agency | None = None
    #: The :class:`.Agency` associated with the data :class:`~.source.Source`.
    sender: model.Agency | None = None
    #:
    source: InternationalStringDescriptor = InternationalStringDescriptor()
    #:
    test: bool = False

    def __repr__(self):
        """String representation."""
        lines = ["<Header>"]
        lines.extend(_summarize(self))
        return "\n  ".join(lines)


@dataclass
class Footer(Comparable):
    """Footer of an SDMX-ML message.

    SDMX-JSON messages do not have footers.
    """

    #:
    severity: str | None = None
    #: The body text of the Footer contains zero or more blocks of text.
    text: list[model.InternationalString] = field(default_factory=list)
    #:
    code: int | None = None

    def __post_init__(self):
        # Convert non-IS members to IS
        self.text = [
            t if isinstance(t, InternationalString) else InternationalString(t)
            for t in self.text
        ]


@dataclass
class Message(Comparable):
    #: SDMX version.
    version: Version = Version["2.1"]

    #: :class:`Header` instance.
    header: Header = field(default_factory=Header)
    #: (optional) :class:`Footer` instance.
    footer: Footer | None = None
    #: :class:`requests.Response` instance for the response to the HTTP request that
    #: returned the Message. This is not part of the SDMX standard.
    response: "requests.Response | None" = None

    def __str__(self):
        return repr(self)

    def __repr__(self):
        """String representation."""
        lines = [
            f"<sdmx.{self.__class__.__name__}>",
            repr(self.header).replace("\n", "\n  "),
        ]
        lines.extend(_summarize(self, ["footer", "response"]))
        return "\n  ".join(lines)


class ErrorMessage(Message):
    pass


@dataclass
class StructureMessage(Message):
    """SDMX StructureMessage."""

    #: Collection of :class:`.Categorisation`.
    categorisation: DLD[str, model.Categorisation] = DLD()
    #: Collection of :class:`.CategoryScheme`.
    category_scheme: DLD[str, model.CategoryScheme] = DLD()
    #: Collection of :class:`.Codelist`.
    codelist: DLD[str, model.Codelist] = DLD()
    #: Collection of :class:`.ConceptScheme`.
    concept_scheme: DLD[str, model.ConceptScheme] = DLD()
    #: Collection of :class:`.ContentConstraint`.
    constraint: DLD[str, model.BaseConstraint] = DLD()
    #: Collection of :class:`.CustomTypeScheme`.
    custom_type_scheme: DLD[str, model.CustomTypeScheme] = DLD()
    #: Collection of :class:`Dataflow(Definition) <.BaseDataflow>`.
    dataflow: DLD[str, model.BaseDataflow] = DLD()
    #: Collection of :class:`.HierarchicalCodelist`.
    hierarchical_codelist: DLD[str, v21.HierarchicalCodelist] = DLD()
    #: Collection of :class:`.v30.Hierarchy`.
    hierarchy: DLD[str, v30.Hierarchy] = DLD()
    #: Collection of :class:`Metadataflow(Definition) <.BaseMetadataflow>`.
    metadataflow: DLD[str, model.BaseMetadataflow] = DLD()
    #: Collection of :class:`MetadataStructureDefinition
    #: <.BaseMetadataStructureDefinition>`.
    metadatastructure: DLD[str, model.BaseMetadataStructureDefinition] = DLD()
    #: Collection of :class:`.NamePersonalisationScheme`.
    name_personalisation_scheme: DLD[str, model.NamePersonalisationScheme] = DLD()
    #: Collection of :class:`.OrganisationScheme`.
    organisation_scheme: DLD[str, model.OrganisationScheme] = DLD()
    #: Collection of :class:`.ProvisionAgreement`.
    provisionagreement: DLD[str, model.ProvisionAgreement] = DLD()
    #: Collection of :class:`.RulesetScheme`.
    ruleset_scheme: DLD[str, model.RulesetScheme] = DLD()
    #: Collection of :class:`DataStructureDefinition <.BaseDataStructureDefinition>`.
    structure: DLD[str, model.BaseDataStructureDefinition] = DLD()
    #: Collection of :class:`.StructureSet`.
    structureset: DLD[str, v21.StructureSet] = DLD()
    #: Collection of :class:`.TransformationScheme`.
    transformation_scheme: DLD[str, model.TransformationScheme] = DLD()
    #: Collection of :class:`.UserDefinedOperatorScheme`.
    user_defined_operator_scheme: DLD[str, model.UserDefinedOperatorScheme] = DLD()
    #: Collection of :class:`.ValueList` (SDMX 3.0 only).
    valuelist: DLD[str, v30.ValueList] = DLD()
    #: Collection of :class:`.VTLMappingScheme`.
    vtl_mapping_scheme: DLD[str, model.VTLMappingScheme] = DLD()

    def __post_init__(self):
        # Construct a list referencing all of the collections
        self._collections = [
            getattr(self, f.name) for f in direct_fields(self.__class__)
        ]

    def add(self, obj: model.IdentifiableArtefact):
        """Add `obj` to the StructureMessage."""
        for f in direct_fields(self.__class__):
            # NB for some reason mypy complains here, but not in __contains__(), below
            if isinstance(obj, get_args(f.type)[1]):
                getattr(self, f.name)[obj.id] = obj
                return
        raise TypeError(type(obj))

    def get(
        self, obj_or_id: str | model.IdentifiableArtefact
    ) -> model.IdentifiableArtefact | None:
        """Retrieve `obj_or_id` from the StructureMessage.

        Parameters
        ----------
        obj_or_id : str or .IdentifiableArtefact
            - if an IdentifiableArtefact, return an object of the same class and
              :attr:`~.IdentifiableArtefact.id`.
            - if :class:`str`, this may be:

              - An exact match for some :attr:`.IdentifiableArtefact.id`.
              - Part of an SDMX :class:`URN`, for instance "FOO(1.2.3)", "MAINT:FOO", or
                "MAINT:FOO(1.2.3)".
              - A full SDMX :class:`URN`.

        Returns
        -------
        .IdentifiableArtefact
            with the given ID and possibly class, or :any:`None` if there is no match.

        Raises
        ------
        ValueError
            if there are ≥2 objects with the same `obj_or_id`; for instance, two objects
            of different classes, or two objects of the same class with different
            :attr:`~.MaintainableArtefact.maintainer` or
            :attr:`~.VersionableArtefact.version`.
        """
        id_ = (
            obj_or_id.id
            if isinstance(obj_or_id, model.IdentifiableArtefact)
            else obj_or_id
        )

        # Regular expression for matching object URNs
        try:
            # - Handle `id_` as if it is a partial or complete URN.
            # - Convert to a regular expression pattern.
            # - If the version is not given, match any version.
            urn_expr = re.escape(str(sdmx.urn.URN(sdmx.urn.expand(id_)))).replace(
                r"\(None\)", r"\([^\)]*\)"
            )
        except ValueError:
            # `id_` is not a (partial) URN. Match it `id_` immediately after one of the
            # separator characters
            urn_expr = rf"[=:\.]{re.escape(id_)}"

        urn_pat = re.compile(urn_expr)  # Compile re.Pattern object once

        candidates: list[model.IdentifiableArtefact] = []
        for key, obj in chain(*[c.items() for c in self._collections]):
            # Obtain a matchable string with the URN of `obj`
            try:
                urn = obj.urn or sdmx.urn.make(obj)  # Existing or constructed URN
            except ValueError:
                urn = ""  # No existing URN and unable to construct one
            if id_ in (key, obj.id) or urn_pat.search(urn):
                candidates.append(obj)

        if len(candidates) > 1:
            raise ValueError(f"ambiguous; {repr(obj_or_id)} matches {repr(candidates)}")

        return candidates[0] if len(candidates) == 1 else None

    def iter_collections(self) -> Generator[tuple[str, type], None, None]:
        """Iterate over collections."""
        for f in direct_fields(self.__class__):
            yield f.name, get_args(f.type)[1]

    def iter_objects(
        self, external_reference: bool = True
    ) -> Generator[common.MaintainableArtefact, None, None]:
        """Iterate over all objects in the message."""
        for _, cls in self.iter_collections():
            for obj in self.objects(cls).values():
                if not external_reference and obj.is_external_reference:
                    continue
                yield obj

    def objects(self, cls):
        """Get a reference to the attribute for objects of type `cls`.

        For example, if `cls` is the class :class:`DataStructureDefinition` (not an
        instance), return a reference to :attr:`structure`.
        """
        for f in direct_fields(self.__class__):
            if issubclass(cls, get_args(f.type)[1]):
                return getattr(self, f.name)
        raise TypeError(cls)

    def __contains__(self, item):
        """Return :obj:`True` if `item` is in the StructureMessage."""
        for f in direct_fields(self.__class__):
            if isinstance(item, get_args(f.type)[1]):
                return item in getattr(self, f.name).values()
        raise TypeError(f"StructureMessage has no collection of {type(item)}")

    def __repr__(self):
        """String representation."""
        lines = [super().__repr__()]

        # StructureMessage contents
        for attr in self.__dict__.values():
            if isinstance(attr, DictLike) and attr:
                lines.append(summarize_dictlike(attr))

        return "\n  ".join(lines)


@dataclass
class DataMessage(Message):
    """SDMX Data Message.

    .. note:: A DataMessage may contain zero or more :class:`.DataSet`, so :attr:`data`
       is a list. To retrieve the first (and possibly only) data set in the message,
       access the first element of the list: :py:`msg.data[0]`.
    """

    #: :class:`list` of :class:`.DataSet`.
    data: list[model.BaseDataSet] = field(default_factory=list)
    #: :class:`.DataflowDefinition` that contains the data.
    dataflow: model.BaseDataflow | None = None
    #: The "dimension at observation level".
    observation_dimension: (
        model._AllDimensions
        | model.DimensionComponent
        | list[model.DimensionComponent]
        | None
    ) = None

    def __post_init__(self):
        if self.dataflow is None:
            # Create a default of the appropriate class
            self.dataflow = {
                Version["2.1"]: v21.DataflowDefinition,
                Version["3.0.0"]: v30.Dataflow,
            }[self.version]()

    # Convenience access
    @property
    def structure(self):
        """DataStructureDefinition used in the :attr:`dataflow`."""
        return self.dataflow.structure

    @property
    def structure_type(self) -> type[common.Structure]:
        """:class:`.Structure` subtype describing the contained (meta)data."""
        return {
            Version["2.1"]: v21.DataStructureDefinition,
            Version["3.0.0"]: v30.DataStructureDefinition,
        }[self.version]

    def __repr__(self):
        """String representation."""
        lines = [super().__repr__()]

        # DataMessage contents
        if self.data:
            lines.append("DataSet ({})".format(len(self.data)))
        lines.extend(_summarize(self, ("dataflow", "observation_dimension")))

        return "\n  ".join(lines)

    def update(self) -> None:
        """Update :attr:`.observation_dimension`.

        The observation dimensions (or dimension observation) is determined
        automatically if:

        1. There is at least 1 :class:`DataSet <.BaseDataSet>` in the message.
        2. For at least 1 data set:

           - :attr:`~.BaseDataSet.structured_by` is defined.
           - There is at least 1 :class:`.Observation` in the data set. (:meth:`.update`
             checks only the first observation.)
           - The :attr:`.Observation.dimension` is a :class:`.Key` referring to exactly
             1 dimension.

        3. The dimension indicated by (2) is the same for all DataSets in the message.

        If not all these conditions are met, messages are logged with level DEBUG, and
        :attr:`.observation_dimension` is set to :any:`None`.

        .. note:: :meth:`.update` is not automatically called when data sets are added
           to or removed from :attr:`.data`. User code **should** call :meth:`.update`
           to reflect such changes.
        """
        if not self.data:
            log.debug("No DataSet in message")
            self.observation_dimension = None
            return

        dims = set()
        for ds in self.data:
            try:
                assert ds.structured_by

                # Use the first observation
                assert len(ds.obs)
                o0 = ds.obs[0]
                assert o0.dimension

                # Identify the dimensions specified per-observation
                d_a_o = tuple(o0.dimension.values.keys())

                if 1 == len(d_a_o):
                    # Single dimension-at-observation
                    # Record as an attribute of the DataMessage
                    dims.add(ds.structured_by.dimensions.get(d_a_o[0]))
                else:
                    dims.add(d_a_o)
            except AssertionError:
                continue

        if len(dims) == 1 and not all(isinstance(d, tuple) for d in dims):
            self.observation_dimension = dims.pop()
        else:
            if len(dims) == 1:
                log.debug(f"More than 1 dimension at observation level: {dims.pop()}")
            elif len(dims) > 1:
                log.debug(
                    f"Multiple data sets with different observation dimension: {dims}"
                )
            elif not dims:
                log.debug(
                    f"Unable to determine observation dimension for {len(self.data)} "
                    "data set(s). Data set(s) may lack structure reference or "
                    "observations."
                )
            self.observation_dimension = None


@dataclass
class MetadataMessage(DataMessage):
    """SDMX Metadata Message."""

    @property
    def structure_type(self) -> type[common.Structure]:
        return {
            Version["2.1"]: v21.MetadataStructureDefinition,
            Version["3.0.0"]: v30.MetadataStructureDefinition,
        }[self.version]


class RegistryInterface(Message):
    """Common base class for registry interface messages."""


@dataclass
class SubmitStructureResponse(RegistryInterface):
    """SDMX SubmitStructureResponse."""

    result: list[common.SubmissionResult] = field(default_factory=list)