import logging
import os
import pickle
from collections.abc import Mapping, MutableMapping, Sequence
from dataclasses import dataclass, field, fields, is_dataclass, replace
from hashlib import blake2s
from pathlib import Path
from typing import TYPE_CHECKING, Any, Hashable, Optional
import ixmp
from ._dataclasses import asdict
from .scenarioinfo import ScenarioInfo
if TYPE_CHECKING:
import message_ix
log = logging.getLogger(__name__)
ixmp.config.register("no message_data", bool, False)
ixmp.config.register("message local data", Path, Path.cwd())
def _local_data_factory():
"""Default values for :attr:`.Config.local_data."""
return (
Path(
os.environ.get("MESSAGE_LOCAL_DATA", None)
or ixmp.config.get("message local data")
)
.expanduser()
.resolve()
)
[docs]
@dataclass
class ConfigHelper:
"""Mix-in for :class:`dataclass`-based configuration classes.
This provides methods :meth:`read_file`, :meth:`replace`, and :meth:`from_dict` that
help to use :class:`dataclass` classes for handling :mod:`message_ix_models`
configuration.
All 3 methods take advantage of name manipulations: the characters "-" and " " are
replaced with underscores ("_"). This allows to write the names of attributes in
legible ways—e.g. "attribute name" or “attribute-name” instead of "attribute_name"—
in configuration files and/or code.
It also add :meth:`hexdigest`.
"""
@classmethod
def _fields(cls) -> set[str]:
"""Names of fields in `cls`."""
result = set(dir(cls))
if is_dataclass(cls):
result |= set(map(lambda f: f.name, fields(cls)))
return result
@classmethod
def _canonical_name(cls, name: Hashable) -> Optional[str]:
"""Canonicalize a name into a valid Python attribute name."""
_name = str(name).replace(" ", "_").replace("-", "_")
return _name if _name in cls._fields() else None
@classmethod
def _munge_dict(cls, data: Mapping[Hashable, Any], fail: str, kind: str):
for key, value in data.items():
name = cls._canonical_name(key)
if name:
yield name, value
else:
msg = f"{cls.__name__} has no attribute for {kind} {key!r}"
if fail == "raise":
raise ValueError(msg)
else:
log.info(f"{msg}; ignored")
[docs]
def read_file(self, path: Path, fail="raise") -> None:
"""Update configuration from file.
Parameters
----------
path
to a :file:`.yaml` file containing a top-level mapping.
fail : str
if "raise" (the default), any names in `path` which do not match attributes
of the dataclass raise a ValueError. Ottherwise, a message is logged.
"""
if path.suffix == ".yaml":
import yaml
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f)
elif path.suffix == ".json":
import json
with open(path) as f:
data = json.load(f)
else:
raise NotImplementedError(f"Read from {path.suffix}")
for key, value in self._munge_dict(data, fail, "file section"):
existing = getattr(self, key, None)
if is_dataclass(existing) and not isinstance(existing, type):
# Attribute value is also a dataclass; update it recursively
if isinstance(existing, ConfigHelper):
# Use name manipulation on the attribute value also
value = existing.replace(**value)
elif not isinstance(existing, type):
# https://github.com/python/mypy/issues/15843
# TODO Check that fix is available in mypy 1.7.x; remove
value = replace(existing, **value) # type: ignore [misc]
setattr(self, key, value)
[docs]
def replace(self, **kwargs):
"""Like :func:`dataclasses.replace` with name manipulation."""
return replace(
self,
**{k: v for k, v in self._munge_dict(kwargs, "raise", "keyword argument")},
)
[docs]
def update(self, **kwargs):
"""Update attributes in-place.
Raises
------
AttributeError
Any of the `kwargs` are not fields in the data class.
"""
# TODO use _munge_dict(); allow a positional argument
for k, v in kwargs.items():
if not hasattr(self, k):
raise AttributeError(k)
setattr(self, k, v)
[docs]
@classmethod
def from_dict(cls, data: Mapping):
"""Construct an instance from `data` with name manipulation."""
return cls(**{k: v for k, v in cls._munge_dict(data, "raise", "mapping key")})
[docs]
def hexdigest(self, length: int = -1) -> str:
"""Return a hex digest that is unique for distinct settings on the instance.
Returns
-------
str
If `length` is non-zero, a string of this length; otherwise a 32-character
string from :meth:`.blake2s.hexdigest`.
"""
# - Dump the dataclass instance to nested, sorted tuples. This is used instead
# of dataclass.astuple() which allows e.g. units to pass as a (possibly
# unsorted) dict.
# - Pickle this collection.
# - Hash.
h = blake2s(
pickle.dumps(asdict(self, dict_factory=lambda kv: tuple(sorted(kv))))
)
# Return the whole digest or a part
return h.hexdigest()[0 : length if length > 0 else h.digest_size]
[docs]
@dataclass
class Config:
"""Core/top-level settings for :mod:`message_ix_models` and :mod:`message_data`."""
#: Base path for cached data, e.g. as given by the :program:`--cache-path` CLI
#: option. Default: the directory :file:`message-ix-models` within the directory
#: given by :func:`.platformdirs.user_cache_path`.
cache_path: Optional[str] = None
#: Paths of files containing debug outputs. See
#: :meth:`.Context.write_debug_archive`.
debug_paths: Sequence[str] = field(default_factory=list)
#: Like :attr:`url`, used by e.g. :meth:`.clone_to_dest`.
dest: Optional[str] = None
#: Like :attr:`platform_info`, used by e.g. :meth:`.clone_to_dest`.
dest_platform: MutableMapping[str, str] = field(default_factory=dict)
#: Like :attr:`scenario_info`, used by e.g. :meth:`.clone_to_dest`.
dest_scenario: MutableMapping[str, str] = field(default_factory=dict)
#: Whether an operation should be carried out, or only previewed. Different modules
#: will respect :attr:`dry_run` in distinct ways, if at all, and **should** document
#: behaviour.
dry_run: bool = False
#: Base path for :ref:`system-specific data <local-data>`, i.e. as given by the
#: :program:`--local-data` CLI option or `message local data` key in the ixmp
#: configuration file.
local_data: Path = field(default_factory=_local_data_factory)
#: Keyword arguments—especially `name`—for the :class:`ixmp.Platform` constructor,
#: from the :program:`--platform` or :program:`--url` CLI option.
platform_info: MutableMapping[str, str] = field(default_factory=dict)
# Private reference to an ixmp.Platform
_mp: Optional["ixmp.Platform"] = None
#: Keyword arguments—`model`, `scenario`, and optionally `version`—for the
#: :class:`ixmp.Scenario` constructor, as given by the :program:`--model`/
#: :program:`--scenario` or :program:`--url` CLI options.
scenario_info: MutableMapping[str, str] = field(default_factory=dict)
#: Like `scenario_info`, but a list for operations affecting multiple scenarios.
scenarios: list[ScenarioInfo] = field(default_factory=list)
#: A scenario URL, e.g. as given by the :program:`--url` CLI option.
url: Optional[str] = None
#: Flag for causing verbose output to logs or stdout. Different modules will respect
#: :attr:`verbose` in distinct ways.
verbose: bool = False
def __post_init__(self):
if self.cache_path is None:
from platformdirs import user_cache_path
self.cache_path = user_cache_path("message-ix-models", ensure_exists=True)
def __deepcopy__(self, memo):
# Hide "_mp" from the copy
mp = self._mp
self._mp = None
# Create a copy of other field values; invokes deepcopy() on each
data = asdict(self)
# Restore reference
self._mp = mp
# Create the copy
return type(self)(**data)
[docs]
def close_db(self) -> None:
"""Close the database connection for the Platform given by :meth:`get_platform`.
If no such Platform exists or the connection is already closed, does nothing.
"""
try:
self._mp.close_db()
self._mp = None
except AttributeError:
pass
[docs]
def get_cache_path(self, *parts) -> Path:
"""Return a path to a local cache file, i.e. within :attr:`cache_path`.
The directory containing the resulting path is created if it does not already
exist.
"""
result = self.cache_path.joinpath(*parts)
result.parent.mkdir(parents=True, exist_ok=True) # Ensure the directory exists
return result
[docs]
def get_local_path(self, *parts: str, suffix=None) -> Path:
"""Return a path under :attr:`local_data`.
Parameters
==========
parts :
Path fragments, for instance directories, passed to
:meth:`~.pathlib.PurePath.joinpath`.
suffix :
File name suffix including a "."—for instance, ".csv"—passed to
:meth:`~.pathlib.PurePath.with_suffix`.
"""
result = self.local_data.joinpath(*parts)
return result.with_suffix(suffix) if suffix else result
[docs]
def get_scenario(self) -> "message_ix.Scenario":
"""Return a :class:`.Scenario` from :attr:`scenario_info`.
When used through the CLI, :attr:`scenario_info` is a ‘base’ scenario for an
operation, indicated by the ``--url`` or ``--platform/--model/--scenario``
options.
"""
import message_ix
return message_ix.Scenario(self.get_platform(), **self.scenario_info)
[docs]
def handle_cli_args(
self,
url: Optional[str] = None,
platform: Optional[str] = None,
model_name: Optional[str] = None,
scenario_name: Optional[str] = None,
version: Optional[str] = None,
local_data: Optional[str] = None,
verbose: bool = False,
_store_as: tuple[str, str] = ("platform_info", "scenario_info"),
):
"""Handle command-line arguments.
May update the :attr:`local_data`, :attr:`platform_info`,
:attr:`scenario_info`, and/or :attr:`url` settings.
"""
from click import BadOptionUsage
from .ixmp import parse_url
self.verbose = verbose
# Store the path to command-specific data and metadata
if local_data:
self.local_data = local_data
# References to the Context settings to be updated
platform_info = getattr(self, _store_as[0])
scenario_info = getattr(self, _store_as[1])
# Store information for the target Platform
if url:
if platform or model_name or scenario_name or version:
raise BadOptionUsage(
"--platform --model --scenario and/or --version",
" redundant with --url",
)
self.url = url
urlinfo = parse_url(url)
platform_info.update(urlinfo[0])
scenario_info.update(urlinfo[1])
elif platform:
platform_info["name"] = platform
# Store information about the target Scenario
if model_name:
scenario_info["model"] = model_name
if scenario_name:
scenario_info["scenario"] = scenario_name
if version:
scenario_info["version"] = version
[docs]
def set_scenario(self, scenario: "message_ix.Scenario") -> None:
"""Update :attr:`scenario_info` to match an existing `scenario`.
:attr:`url` is also updated.
"""
self.scenario_info.update(
model=scenario.model, scenario=scenario.scenario, version=scenario.version
)
try:
url = scenario.url
except AttributeError:
# Compatibility with ixmp <3.5
url = f"{scenario.model}/{scenario.scenario}/{scenario.version}"
self.url = f"ixmp://{scenario.platform.name}/{url}"
[docs]
def write_debug_archive(self) -> None:
"""Write an archive containing the files listed in :attr:`debug_paths`.
The archive file name is constructed using :func:`.unique_id` and appears in a
:file:`debug` subdirectory under the :ref:`local data path <local-data>`.
The archive also contains a file :file:`command.txt` that gives the full
command-line used to invoke :program:`mix-models`.
"""
from zipfile import ZIP_DEFLATED, ZipFile
from .click import format_sys_argv, unique_id
# Output file
target = self.local_data.joinpath("debug", f"{unique_id()}.zip")
log.info(f"Write to: {target}")
target.parent.mkdir(parents=True, exist_ok=True)
with ZipFile(target, mode="w", compression=ZIP_DEFLATED) as zf:
# Write a file that contains the CLI invocation
zf.writestr("command.txt", format_sys_argv())
# Write the identified files
for dp in self.debug_paths:
if not dp.exists():
log.info(f"Not found: {dp}")
continue
zf.write(dp, arcname=dp.relative_to(self.local_data))
# log.info(debug_path)