Source code for message_ix_models.util.context

"""Context and settings for :mod:`message_ix_models` code."""

import logging
from copy import deepcopy
from dataclasses import fields
from pathlib import Path

import ixmp
import message_ix
from click import BadOptionUsage

from .config import Config
from .ixmp import parse_url

log = logging.getLogger(__name__)

#: List of Context instances, from first created to last.
_CONTEXTS: list["Context"] = []


# Configuration keys which can be accessed directly on Context.
_ALIAS = dict()
_ALIAS.update({f.name: "core" for f in fields(Config)})


def _dealiased(base: str, data: dict) -> dict:
    """Separate values from `data` which belong on `base` according to `_ALIAS`."""
    result = {}
    for name, path in filter(lambda ap: ap[1] == base, _ALIAS.items()):
        try:
            result[name] = data.pop(name)
        except KeyError:
            pass

    if len(result):
        log.warning(
            f"Create a Config instance instead of passing {list(result.keys())} to"
            " Context()"
        )

    return result


[docs]class Context(dict): """Context and settings for :mod:`message_ix_models` code.""" # NB the docs contain a table of settings
[docs] @classmethod def get_instance(cls, index=0) -> "Context": """Return a Context instance; by default, the first created. Parameters ---------- index : int, optional Index of the Context instance to return, e.g. ``-1`` for the most recently created. """ return _CONTEXTS[index]
[docs] @classmethod def only(cls) -> "Context": """Return the only :class:`.Context` instance. Raises ------ IndexError If there is more than one instance. """ if len(_CONTEXTS) > 1: raise IndexError(f"ambiguous: {len(_CONTEXTS)} Context instances") return _CONTEXTS[0]
def __init__(self, *args, **kwargs): from message_ix_models.model import Config as ModelConfig from message_ix_models.report import Config as ReportConfig if len(_CONTEXTS) == 0: log.info("Create root Context") # Handle keyword arguments going to known config dataclasses kwargs["core"] = Config(**_dealiased("core", kwargs)) kwargs["model"] = ModelConfig(**_dealiased("model", kwargs)) kwargs["report"] = ReportConfig() # Store any keyword arguments super().__init__(*args, **kwargs) # Store a reference for get_instance() _CONTEXTS.append(self) def _dealias(self, name): base = _ALIAS[name] # Warn about direct reference to aliased attributes if base not in {"core", "model"}: # pragma: no cover log.warnings(f"Use Context.{base}.{name} instead of Context.{name}") return getattr(self, base), name # Item access def __getitem__(self, name): try: return getattr(*self._dealias(name)) except KeyError: return super().__getitem__(name) def __setitem__(self, name, value): try: return setattr(*self._dealias(name), value) except KeyError: super().__setitem__(name, value)
[docs] def update(self, arg=None, **kwargs): # Force update() to use the __setitem__ above for k, v in dict(*filter(None, [arg]), **kwargs).items(): self.__setitem__(k, v)
# Attribute access def __setattr__(self, name, value): self[name] = value def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) from None def __deepcopy__(self, memo): mp = self.pop("_mp", None) result = deepcopy(super(), memo) if mp is not None: self._mp = mp _CONTEXTS.append(result) return result def __eq__(self, other) -> bool: # Don't compare contents, only identity, for _CONTEXTS.index() if not isinstance(other, Context): return NotImplemented return id(self) == id(other) def __repr__(self): return f"<{self.__class__.__name__} object at {id(self)} with {len(self)} keys>"
[docs] def delete(self): """Hide the current Context from future :meth:`.get_instance` calls.""" # Index of the *last* matching instance index = len(_CONTEXTS) - 1 - list(reversed(_CONTEXTS)).index(self) if index > 0: _CONTEXTS.pop(index) else: # pragma: no cover # The `session_context` fixture means this won't occur during tests log.warning("Won't delete the only Context instance") self.close_db()
[docs] def write_debug_archive(self) -> None: """Write an archive containing the files listed in :attr:`.debug_paths`. The archive file name is constructed using :func:`.unique_id` and appears in a :file:`debug` subdirectory under the :ref:`local data path <local-data>`. The archive also contains a file :file:`command.txt` that gives the full command-line used to invoke :program:`mix-models`. """ from zipfile import ZIP_DEFLATED, ZipFile from .click import format_sys_argv, unique_id # Output file target = self.core.local_data.joinpath("debug", f"{unique_id()}.zip") log.info(f"Write to: {target}") target.parent.mkdir(parents=True, exist_ok=True) with ZipFile(target, mode="w", compression=ZIP_DEFLATED) as zf: # Write a file that contains the CLI invocation zf.writestr("command.txt", format_sys_argv()) # Write the identified files for dp in self.core.debug_paths: if not dp.exists(): log.info(f"Not found: {dp}") continue zf.write(dp, arcname=dp.relative_to(self.core.local_data))
# log.info(debug_path)
[docs] def clone_to_dest(self, create=True) -> message_ix.Scenario: """Return a scenario based on the ``--dest`` command-line option. Parameters ---------- create : bool, optional If :obj:`True` (the default) and the base scenario does not exist, a bare RES scenario is created. Otherwise, an exception is raised. Returns ------- Scenario To prevent the scenario from being garbage collected, keep a reference to its Platform: .. code-block: python s = context.clone_to_dest() mp = s.platform See also -------- create_res """ cfg = self.core if not cfg.dest_scenario: # No information on the destination; try to parse a URL, storing the keys # dest_platform and dest_scenario. self.handle_cli_args( url=cfg.dest, _store_as=("dest_platform", "dest_scenario") ) try: # Get the base scenario, e.g. from the --url CLI argument scenario_base = self.get_scenario() # By default, clone to the same platform mp_dest = scenario_base.platform try: if cfg.dest_platform["name"] != mp_dest.name: # Different platform # Not tested; current test fixtures make it difficult to create # *two* temporary platforms simultaneously mp_dest = ixmp.Platform(**cfg.dest_platform) # pragma: no cover except KeyError: pass except Exception as e: log.info("Base scenario not given or found") log.debug(f"{type(e).__name__}: {e}") if not create: log.error("and create=False") raise # Create a bare RES to be the base scenario from message_ix_models.model.bare import create_res # Create on the destination platform ctx = deepcopy(self) ctx.core.platform_info.update(cfg.dest_platform) scenario_base = create_res(ctx) # Clone to the same platform mp_dest = scenario_base.platform # Clone log.info(f"Clone to {repr(cfg.dest_scenario)}") return scenario_base.clone(platform=mp_dest, **cfg.dest_scenario)
def close_db(self): try: mp = self.pop("_mp") mp.close_db() except KeyError: pass
[docs] def get_cache_path(self, *parts) -> Path: """Return a path to a local cache file, i.e. within :attr:`.Config.cache_path`. The directory containing the resulting path is created if it does not already exist. """ result = self.core.cache_path.joinpath(*parts) result.parent.mkdir(parents=True, exist_ok=True) # Ensure the directory exists return result
[docs] def get_local_path(self, *parts: str, suffix=None) -> Path: """Return a path under :attr:`.Config.local_data`. Parameters ========== parts : Path fragments, for instance directories, passed to :meth:`~.pathlib.PurePath.joinpath`. suffix : File name suffix including a "."—for instance, ".csv"—passed to :meth:`~.pathlib.PurePath.with_suffix`. """ result = self.core.local_data.joinpath(*parts) return result.with_suffix(suffix) if suffix else result
[docs] def get_platform(self, reload=False) -> ixmp.Platform: """Return a :class:`.Platform` from :attr:`.Config.platform_info`. When used through the CLI, :attr:`.Config.platform_info` is a 'base' platform as indicated by the --url or --platform options. If a Platform has previously been instantiated with :meth:`get_platform`, the same object is returned unless `reload=True`. """ if not reload: # Return an existing Platform, if any try: return self["_mp"] except KeyError: pass # Close any existing Platform, e.g. to reload it try: self["_mp"].close_db() del self["_mp"] except KeyError: pass # Create a Platform self["_mp"] = ixmp.Platform(**self.core.platform_info) return self["_mp"]
[docs] def get_scenario(self) -> message_ix.Scenario: """Return a :class:`.Scenario` from :attr:`~.Config.scenario_info`. When used through the CLI, :attr:`~.Config.scenario_info` is a ‘base’ scenario for an operation, indicated by the ``--url`` or ``--platform/--model/--scenario`` options. """ return message_ix.Scenario(self.get_platform(), **self.core.scenario_info)
[docs] def set_scenario(self, scenario: message_ix.Scenario) -> None: """Update :attr:`.Config.scenario_info` to match an existing `scenario`. :attr:`.Config.url` is also updated. """ self.core.scenario_info.update( model=scenario.model, scenario=scenario.scenario, version=scenario.version ) try: url = scenario.url except AttributeError: # Compatibility with ixmp <3.5 url = f"{scenario.model}/{scenario.scenario}/{scenario.version}" self.core.url = f"ixmp://{scenario.platform.name}/{url}"
[docs] def handle_cli_args( self, url=None, platform=None, model_name=None, scenario_name=None, version=None, local_data=None, verbose=False, _store_as=("platform_info", "scenario_info"), ): """Handle command-line arguments. May update the :attr:`.Config.local_data`, :attr:`~.Config.platform_info`, :attr:`~.Config.scenario_info`, and/or :attr:`~.Config.url` settings. """ self.core.verbose = verbose # Store the path to command-specific data and metadata if local_data: self.core.local_data = local_data # References to the Context settings to be updated platform_info = getattr(self.core, _store_as[0]) scenario_info = getattr(self.core, _store_as[1]) # Store information for the target Platform if url: if platform or model_name or scenario_name or version: raise BadOptionUsage( "--platform --model --scenario and/or --version", " redundant with --url", ) self.core.url = url urlinfo = parse_url(url) platform_info.update(urlinfo[0]) scenario_info.update(urlinfo[1]) elif platform: platform_info["name"] = platform # Store information about the target Scenario if model_name: scenario_info["model"] = model_name if scenario_name: scenario_info["scenario"] = scenario_name if version: scenario_info["version"] = version
[docs] def use_defaults(self, settings): """Update from `settings`.""" for setting, info in settings.items(): if setting not in self: log.info(f"Use default {setting}={info[0]}") value = self.setdefault(setting, info[0]) if value not in info: raise ValueError(f"{setting} must be in {info}; got {value}")