Source code for message_ix_models.model.build

import logging
from collections.abc import Callable, Mapping
from typing import Optional, Union

import ixmp
import pandas as pd
from message_ix import Scenario
from sdmx.model.v21 import Code

from message_ix_models.util import add_par_data, strip_par_data
from message_ix_models.util.ixmp import maybe_check_out, maybe_commit
from message_ix_models.util.scenarioinfo import ScenarioInfo, Spec

log = logging.getLogger(__name__)


def _add_unit(mp: ixmp.Platform, unit: str, comment: str) -> None:
    """Handle exceptions in :meth:`.Platform.add_unit`."""
    # TODO move upstream to ixmp.JDBCBackend
    log.info(f"Add unit {repr(unit)}")
    try:
        mp.add_unit(unit, comment)
    except Exception as e:  # pragma: no cover
        if "Error assigning an unit-key-id mapping" in str(e) and "" == str(unit):
            log.warning(f"…skip {repr(unit)} (ixmp.JDBCBackend with Oracle database)")
        else:
            raise


# FIXME Reduce complexity from 14 to ≤13
[docs]def apply_spec( # noqa: C901 scenario: Scenario, spec: Union[Spec, Mapping[str, ScenarioInfo]], data: Optional[Callable] = None, **options, ): """Apply `spec` to `scenario`. Parameters ---------- spec : .Spec Specification of changes to make to `scenario`. data : callable, optional Function to add data to `scenario`. `data` can either manipulate the scenario directly, or return a :class:`dict` compatible with :func:`.add_par_data`. Other parameters ---------------- dry_run : bool Don't modify `scenario`; only show what would be done. Default :obj:`False`. Exceptions will still be raised if the elements from ``spec['required']`` are missing; this serves as a check that the scenario has the required features for applying the spec. fast : bool Do not remove existing parameter data; increases speed on large scenarios. quiet : bool Only show log messages at level ``ERROR`` and higher. If :obj:`False` (default), show log messages at level ``DEBUG`` and higher. message : str Commit message. See also -------- .add_par_data .strip_par_data .Code .ScenarioInfo """ dry_run = options.get("dry_run", False) fast = options.get("fast", False) log.setLevel(logging.ERROR if options.get("quiet", False) else logging.DEBUG) if not dry_run: try: scenario.remove_solution() except ValueError: pass maybe_check_out(scenario) dump: dict[str, pd.DataFrame] = {} # Removed data # Sort the list of sets by the number of dimensions; this places basic (non-indexed) # sets first. Elements for these sets must be added before elements for indexed # sets that may reference them. sets = sorted((len(scenario.idx_sets(s)), s) for s in scenario.set_list()) # Existing 'region' codes stored on the Platform associated with `scenario` platform_regions = set(scenario.platform.regions()["region"]) for _, set_name in sets: # Check whether this set is mentioned at all in the spec if 0 == sum(map(lambda info: len(info.set[set_name]), spec.values())): # Not mentioned; don't do anything continue log.info(f"Set {repr(set_name)}") # Base contents of the set base_set = scenario.set(set_name) # Unpack a multi-dimensional/indexed set to a list of tuples base = ( list(base_set.itertuples(index=False)) if isinstance(base_set, pd.DataFrame) else base_set.tolist() ) log.info(f" {len(base)} elements") # log.debug(', '.join(map(repr, base))) # All elements; verbose # Check for required elements require = spec["require"].set[set_name] log.info(f" Check {len(require)} required elements") # Raise an exception about the first missing element missing = list(filter(lambda e: e not in base, require)) if missing: log.error(f" {len(missing)} elements not found: {missing!r}") raise ValueError # Remove elements and associated parameter values for element in spec["remove"].set[set_name]: strip_par_data( scenario, set_name, element, dry_run=dry_run, dump=None if fast else dump, ) # Add elements add = [] if dry_run else spec["add"].set[set_name] for element in add: name = element.id if isinstance(element, Code) else element scenario.add_set(set_name, name) if set_name == "node" and name not in platform_regions: scenario.platform.add_region(name, "region") if len(add): log.info(f" Add {len(add)} element(s)") log.debug(" " + ellipsize(add)) log.info(" ---") if not fast: N_removed = sum(len(d) for d in dump.values()) log.info(f"{N_removed} total rows removed") # Add units to the Platform before adding data for unit in spec["add"].set["unit"]: unit = unit if isinstance(unit, Code) else Code(id=unit, name=unit) _add_unit(scenario.platform, unit.id, str(unit.name)) # Add data if callable(data): result = data(scenario, dry_run=dry_run) if result: # `data` function returned some data; use add_par_data() add_par_data(scenario, result, dry_run=dry_run) # Finalize log.info("Commit results.") maybe_commit( scenario, condition=not dry_run, message=options.get("message", f"{__name__}.apply_spec()"), )
[docs]def ellipsize(elements: list) -> str: """Generate a short string representation of `elements`. If the list has more than 5 elements, only the first two and last two are shown, with "..." between. """ if len(elements) > 5: return ", ".join(map(str, elements[:2] + ["..."] + elements[-2:])) else: return ", ".join(map(str, elements))