import logging
from copy import copy
from pathlib import Path
from typing import Any, Dict, List, Mapping, Optional, Union, cast
import message_ix
import pandas as pd
from message_ix.models import MESSAGE_ITEMS
from sdmx.model import AnnotableArtefact, Annotation, Code
log = logging.getLogger(__name__)
try:
    import message_data
except ImportError:
    log.warning("message_data is not installed")
    MESSAGE_DATA_PATH: Optional[Path] = None
else:  # pragma: no cover  (needs message_data)
    # Root directory of the message_data repository.
    MESSAGE_DATA_PATH = Path(message_data.__file__).parents[1]
# Directory containing message_ix_models.__init__
MESSAGE_MODELS_PATH = Path(__file__).parents[1]
#: Package data already loaded with :func:`load_package_data`.
PACKAGE_DATA: Dict[str, Any] = dict()
#: Data already loaded with :func:`load_private_data`.
PRIVATE_DATA: Dict[str, Any] = dict()
[docs]def add_par_data(
    scenario: message_ix.Scenario,
    data: Mapping[str, pd.DataFrame],
    dry_run: bool = False,
):
    """Add `data` to `scenario`.
    Parameters
    ----------
    data
        Dict with keys that are parameter names, and values are pd.DataFrame or other
        arguments
    dry_run : optional
        Only show what would be done.
    See also
    --------
    strip_par_data
    """
    total = 0
    for par_name, values in data.items():
        N = values.shape[0]
        log.info(f"{N} rows in {repr(par_name)}")
        log.debug(str(values))
        total += N
        if dry_run:
            continue
        scenario.add_par(par_name, values)
    return total 
[docs]def as_codes(data: Union[List[str], Dict[str, Dict]]) -> List[Code]:
    """Convert *data* to a :class:`list` of :class:`.Code` objects.
    Various inputs are accepted:
    - :class:`list` of :class:`str`.
    - :class:`dict`, in which keys are :attr:`.Code.id` and values are further
      :class:`dict` with keys matching other :class:`.Code` attributes.
    """
    # Assemble results as a dictionary
    result: Dict[str, Code] = {}
    if isinstance(data, list):
        # FIXME typing ignored temporarily for PR#9
        data = dict(zip(data, data))  # type: ignore [arg-type]
    elif not isinstance(data, Mapping):
        raise TypeError(data)
    for id, info in data.items():
        if isinstance(info, str):
            info = dict(name=info)
        elif isinstance(info, Mapping):
            info = copy(info)
        else:
            raise TypeError(info)
        code = Code(
            id=str(id),
            name=info.pop("name", str(id).title()),
        )
        # Store the description, if any
        try:
            code.description = info.pop("description")
        except KeyError:
            pass
        # Associate with a parent
        try:
            parent_id = info.pop("parent")
        except KeyError:
            pass  # No parent
        else:
            result[parent_id].append_child(code)
        # Associate with any children
        for id in info.pop("child", []):
            try:
                code.append_child(result[id])
            except KeyError:
                pass  # Not parsed yet
        # Convert other dictionary (key, value) pairs to annotations
        for id, value in info.items():
            code.annotations.append(
                Annotation(id=id, text=value if isinstance(value, str) else repr(value))
            )
        result[code.id] = code
    return list(result.values()) 
[docs]def eval_anno(obj: AnnotableArtefact, id: str):
    """Retrieve the annotation `id` from `obj`, run :func:`eval` on its contents.
    This can be used for unpacking Python values (e.g. :class:`dict`) stored as an
    annotation on a :class:`~sdmx.model.Code`.
    Returns :obj:`None` if no attribute exists with the given `id`.
    """
    try:
        value = str(obj.get_annotation(id=id).text)
    except KeyError:
        # No such attribute
        return None
    try:
        return eval(value)
    except Exception:
        # Something that can't be eval()'d, e.g. a string
        return value 
[docs]def iter_parameters(set_name):
    """Iterate over MESSAGEix parameters with *set_name* as a dimension.
    Parameters
    ----------
    set_name : str
        Name of a set.
    Yields
    ------
    str
        Names of parameters that have `set_name` indexing ≥1 dimension.
    """
    # TODO move upstream. See iiasa/ixmp#402 and iiasa/message_ix#444
    for name, info in MESSAGE_ITEMS.items():
        if info["ix_type"] == "par" and set_name in info["idx_sets"]:
            yield name 
def _load(
    var: Dict, base_path: Path, *parts: str, default_suffix: Optional[str] = None
) -> Any:
    """Helper for :func:`.load_package_data` and :func:`.load_private_data`."""
    key = " ".join(parts)
    if key in var:
        log.debug(f"{repr(key)} already loaded; skip")
        return var[key]
    path = _make_path(base_path, *parts, default_suffix=default_suffix)
    if path.suffix == ".yaml":
        import yaml
        with open(path, encoding="utf-8") as f:
            var[key] = yaml.safe_load(f)
    else:
        raise ValueError(path.suffix)
    return var[key]
def _make_path(
    base_path: Path, *parts: str, default_suffix: Optional[str] = None
) -> Path:
    p = base_path.joinpath(*parts)
    return p.with_suffix(p.suffix or default_suffix) if default_suffix else p
[docs]def load_package_data(*parts: str, suffix: Optional[str] = ".yaml") -> Any:
    """Load a :mod:`message_ix_models` package data file and return its contents.
    Data is re-used if already loaded.
    Example
    -------
    The single call:
    >>> info = load_package_data("node", "R11")
    1. loads the metadata file :file:`data/node/R11.yaml`, parsing its contents,
    2. stores those values at ``PACKAGE_DATA["node R11"]`` for use by other code, and
    3. returns the loaded values.
    Parameters
    ----------
    parts : iterable of str
        Used to construct a path under :file:`message_ix_models/data/`.
    suffix : str, optional
        File name suffix, including, the ".", e.g. :file:`.yaml`.
    Returns
    -------
    dict
        Configuration values that were loaded.
    """
    return _load(
        PACKAGE_DATA,
        MESSAGE_MODELS_PATH / "data",
        *parts,
        default_suffix=suffix,
    ) 
[docs]def load_private_data(*parts: str) -> Mapping:  # pragma: no cover (needs message_data)
    """Load a private data file from :mod:`message_data` and return its contents.
    Analogous to :mod:`load_package_data`, but for non-public data.
    Parameters
    ----------
    parts : iterable of str
        Used to construct a path under :file:`data/` in the :mod:`message_data`
        repository.
    Returns
    -------
    dict
        Configuration values that were loaded.
    Raises
    ------
    RuntimeError
        if :mod:`message_data` is not installed.
    """
    if MESSAGE_DATA_PATH is None:
        raise RuntimeError("message_data is not installed")
    return _load(PRIVATE_DATA, MESSAGE_DATA_PATH / "data", *parts) 
[docs]def package_data_path(*parts) -> Path:
    """Construct a path to a file under :file:`message_ix_models/data/`."""
    return _make_path(MESSAGE_MODELS_PATH / "data", *parts) 
[docs]def private_data_path(*parts) -> Path:  # pragma: no cover (needs message_data)
    """Construct a path to a file under :file:`data/` in :mod:`message_data`."""
    return _make_path(cast(Path, MESSAGE_DATA_PATH) / "data", *parts) 
[docs]def strip_par_data(
    scenario, set_name, value, dry_run=False, dump: Dict[str, pd.DataFrame] = None
):
    """Remove data from parameters of *scenario* where *value* in *set_name*.
    Returns
    -------
    Total number of rows removed across all parameters.
    See also
    --------
    add_par_data
    """
    par_list = scenario.par_list()
    no_data = []
    total = 0
    # Iterate over parameters with ≥1 dimensions indexed by `set_name`
    for par_name in iter_parameters(set_name):
        if par_name not in par_list:
            raise RuntimeError(  # pragma: no cover
                f"MESSAGEix parameter {repr(par_name)} missing in Scenario "
                f"{scenario.model}/{scenario.scenario}"
            )
        # Iterate over dimensions indexed by `set_name`
        for dim, _ in filter(
            lambda item: item[1] == set_name,
            zip(scenario.idx_names(par_name), scenario.idx_sets(par_name)),
        ):
            # Check for contents of par_name that include *value*
            par_data = scenario.par(par_name, filters={dim: value})
            N = len(par_data)
            if N == 0:
                # No data; no need to do anything further
                no_data.append(par_name)
                continue
            elif dump is not None:
                dump[par_name] = pd.concat(
                    [dump.get(par_name, pd.DataFrame()), par_data]
                )
            log.info(f"Remove {N} rows in {repr(par_name)}")
            # Show some debug info
            for col in "commodity level technology".split():
                if col == set_name or col not in par_data.columns:
                    continue
                log.info(f"  with {col}={sorted(par_data[col].unique())}")
            if not dry_run:
                # Actually remove the data
                scenario.remove_par(par_name, key=par_data)
                # NB would prefer to do the following, but raises an exception:
                # scenario.remove_par(par_name, key={set_name: [value]})
            total += N
    level = logging.INFO if total > 0 else logging.DEBUG
    log.log(level, f"{total} rows removed.")
    log.debug(f"No data removed from {len(no_data)} other parameters")
    return total