Source code for message_ix_models.project.navigate.report

"""Reporting for NAVIGATE."""

import logging
import re
from collections.abc import Callable, Collection
from datetime import date
from itertools import count, product
from pathlib import Path
from typing import TYPE_CHECKING, Optional

import pandas as pd
from message_ix import Reporter, Scenario
from sdmx.model.v21 import Code

from message_ix_models import Context
from message_ix_models.model.structure import get_codes
from message_ix_models.project.navigate import iter_scenario_codes
from message_ix_models.report.util import copy_ts
from message_ix_models.util import identify_nodes, nodes_ex_world, private_data_path

if TYPE_CHECKING:
    import message_data.tools.prep_submission

log = logging.getLogger(__name__)


# Functions that perform mapping of certain codes/labels


def _model_name(value: str) -> str:
    """Return a model ID for submission from the NAVIGATE models codelist.

    `value` should be a model name (:attr:`.Scenario.model`) constructed by
    :mod:`.navigate.workflow`.

    The suffix " (NAVIGATE)" used in the internal IIASA ECE database is removed.
    """
    return value.split(" (NAVIGATE)")[0]


def _scenario_name(context: Context, value: str) -> Optional[str]:
    """Return a scenario ID for submission from the NAVIGATE scenarios codelist.

    `value` should be a scenario name (:attr:`.Scenario.scenario`) constructed by
    :mod:`.navigate.workflow`.

    Returns :data:`None` if the value does not appear in list. If
    ``context.navigate.dsd`` is "iiasa-ece", some string is always returned.
    """
    # Transform a complex scenario name from the workflow to the corresponding NAVIGATE
    # identifier

    # Discard "+B" and/or "+MACRO"
    value = re.sub(r"(\+(B|MACRO))", "", value)

    match = re.match(r"^((15C|2C|20C|NPi)-(\w*)).ENGAGE_(?:15C|20C)_step-[123]$", value)
    if match:
        print(f"{match = }")
        value = match.group(1)

    candidates = (
        f"NAV_Dem-{value}",
        # _u/_d scenarios are not implemented currently. Allow a match with the former.
        f"NAV_Dem-{value}_u",
        f"PC-{value}",
        f"PEP-{value}",
    )
    for code, candidate in product(iter_scenario_codes(context), candidates):
        if code.id == candidate:
            return candidate

    # Use "baseline" as-is
    if value == "baseline":
        candidate = value

    return candidate if context.navigate.dsd == "iiasa-ece" else None


def _region(codelist_id: str, value: str) -> str:
    """Return a region ID for submission from the NAVIGATE MESSAGE regions codelist."""
    # Discard the prefix
    return value.split(f"{codelist_id}_")[-1]


#: Regular expression patterns and replacements for variable names. These are applied in
#: the :mod:`prep_submission` step to the full variable names generated by the legacy
#: reporting.
VARIABLE_SUB = (
    (re.compile(r"^Carbon Sequestration\|CCS(.*)$"), r"Carbon Capture|Storage\g<1>"),
    (re.compile(r"^Carbon Sequestration(\|Land Use.*)$"), r"Carbon Removal\g<1>"),
    (re.compile(r"^(Final Energy\|)AFOFI"), r"\g<1>Agriculture"),
    # NB this does *not* apply to Final Energy|Solids|Coal, only names with additional
    #    parts
    (re.compile(r"^(Final Energy\|.*\|Solids\|)Coal"), r"\g<1>Fossil"),
    (
        re.compile(
            r"^((Final Energy\|Transportation|Price\|Secondary Energy)\|Liquids\|)Oil"
        ),
        r"\g<1>Fossil",
    ),
    (
        re.compile(
            r"^((Price\|Final Energy\|Residential|Secondary Energy)\|Gases\|)"
            "Natural Gas"
        ),
        r"\g<1>Fossil",
    ),
    (re.compile(r"^(Secondary Energy\|Solids\|)Coal"), r"\g<1>Fossil"),
    (re.compile(r"^(Production\|)Cement"), r"\g<1>Non-Metallic Minerals|Cement|Volume"),
    (re.compile(r"^(Production\|)Chemicals"), r"\g<1>Chemicals|Volume"),
    (
        re.compile(r"^(Production\|Chemicals\|)High Value Chemicals"),
        r"\g<1>High Value Chemicals|Volume",
    ),
    (
        re.compile(r"^(Production\|Non-Ferrous Metals\|)Aluminium"),
        r"\g<1>Aluminium\|Volume",
    ),
    (re.compile(r"\|Steel"), r"|Iron and Steel"),
    (re.compile(r"^(Production\|Iron and Steel)$"), r"\g<1>|Volume"),
    (
        re.compile(
            r"^(Emissions\|CO2\|Energy\|Demand\|Industry\|Non-Metallic Minerals)\|"
            "Cement"
        ),
        r"\g<1>",
    ),
    (
        re.compile(
            r"^(Emissions\|CO2\|Energy\|Demand\|Industry\|Non-Ferrous Metals)\|"
            "Aluminium"
        ),
        r"\g<1>",
    ),
    # For NGFS, apparently not needed for NAVIGATE
    # ("Commercial", "Residential and Commercial|Commercial"),
    # ("Residential", "Residential and Commercial|Residential"),
    # ("High Value Chemicals", "High value chemicals"),
    # (r"Non-Ferrous Metals\|Aluminium", "Non-ferrous metals"),
    # (r"Non-Metallic Minerals\|Cement", "Cement"),
    # (r"Liquids\|Biomass$", "Liquids|Bioenergy"),
    # (
    #     re.compile(
    #         "^(?:(Investment\|Infrastructure|Price\|Carbon\|Demand)\|)Transport"
    #     ),
    #     "Transportation",
    # ),
    # (re.compile(r"(?:Price\|)Non-Metallic Minerals\|Cement"), "Industry|Cement"),
    # (re.compile(r"(?:Production\|)Primary\|Chemicals"), "Chemicals"),
)


def _variable(value: str) -> str:
    # Apply each of the replacements
    result = value
    for pattern, repl in VARIABLE_SUB:
        result = re.sub(pattern, repl, result)
    return result


UNIT_MAP = {
    ("GW", "Capacity Additions|Electricity|Storage Capacity"): "GWh/yr",
    ("GW", "Capacity Additions|"): "GW/yr",
    ("Mt NOx/yr", None): "Mt NO2/yr",
    ("US$2010/t CO2 or local currency/t CO2", None): "US$2010/t CO2",
    ("US$2010/GJ or local currency/GJ", None): "US$2010/GJ",
    #
    # The following based on error output from the NAVIGATE scenario Explorer
    # TODO fix these in the reporting per se
    ("My/yr", "Collected Scrap|Non-Ferrous Metals"): "Mt/yr",
    ("My/yr", "Total Scrap|Non-Ferrous Metals"): "Mt/yr",
    ("Mt / a", "Emissions|BC"): "Mt BC/yr",
    ("Mt / a", "Emissions|CF4"): "kt CF4/yr",  # TODO check which prefix is correct
    ("Mt / a", "Emissions|CH4"): "Mt CH4/yr",
    # FIXME this is fragile; correct behaviour depends on CO2 appearing first in the
    #       list because prep_submission.map_units() uses str.startswith. Probably use
    #       regular expressions instead
    ("Mt / a", "Emissions|CO2"): "Mt CO2/yr",
    ("Mt / a", "Emissions|CO"): "Mt CO/yr",
    ("Mt / a", "Emissions|N2O"): "kt N2O/yr",  # TODO check which prefix is correct
    ("Mt / a", "Emissions|NH3"): "Mt NH3/yr",
    ("Mt / a", "Emissions|NOx"): "Mt NO2/yr",
    ("Mt / a", "Emissions|OC"): "Mt OC/yr",
    ("million m3/yr", "Forestry Production|Forest Residues"): "million t DM/yr",
    (
        "Index (2005 = 1)",
        "Price|Agriculture|Non-Energy Crops and Livestock|Index",
    ): "Index (2020 = 1)",
    (
        "Index (2005 = 1)",
        "Price|Agriculture|Non-Energy Crops|Index",
    ): "Index (2020 = 1)",
    ("EJ/yr", re.compile("^Trade$")): "billion US$2010/yr",
    ("Mt CO2-equiv/yr", re.compile("^Trade$")): "billion US$2010/yr",
}


[docs] def gen_config( context: Context, workflow_dir: Path, scenarios: Collection[Scenario] ) -> "message_data.tools.prep_submission.Config": """Generate configuration for :mod:`.prep_submission`. Parameters ---------- workflow_dir The base path (directory) for the NAVIGATE workflow repository. scenarios Collection of scenarios. """ from message_data.tools.prep_submission import Config, ScenarioConfig # Identify the file path for output today = date.today().strftime("%Y-%m-%d") dsd_label = "" if context.navigate.dsd == "navigate" else f"_{context.navigate.dsd}" for index in count(): out_file = context.get_local_path("report", f"{today}_{index}{dsd_label}.xlsx") if not out_file.exists(): break # Create base configuration for prep_submission cfg = Config( out_fil=out_file, source_dir=context.get_local_path("report", "legacy") ) # Read the variable list to keep from the NAVIGATE repository cfg.read_nomenclature(workflow_dir) # Iterate over scenarios to include regions = set() for s in scenarios: _name = _scenario_name(context, s.scenario) if _name is None: log.info(f"No target scenario name for {s.url}; skip") continue cfg.scenario[(s.model, s.scenario)] = ScenarioConfig( model=_model_name(s.model), scenario=_name, reference_scenario="baseline", final=True, ) # Identify the node code list for region mapping, below regions.add(identify_nodes(s)) # Construct a filename to read the variable names reported, below filename = legacy_output_path(cfg.source_dir, s) assert 1 == len(regions), ( f"{len(scenarios)} scenarios have {len(regions)} distinct regions: {regions}" ) node_cl = list(regions)[0] # Region name mapping nodes = get_codes(f"node/{node_cl}") nodes = nodes[nodes.index(Code(id="World"))].child # map e.g. "AFR" to "R12_AFR". The former are produced by legacy reporting and/or # its interaction with the IIASA ECE Oracle database and particular region aliases # configured through ixmp that exist only in that database. cfg.name_map["Region"] = { _region(node_cl, n): n for n in map(str, nodes_ex_world(nodes)) } log.debug( f"Region code mapping for target DSD {context.navigate.dsd!r}:\n" + repr(cfg.name_map["Region"]) ) # Unit mapping cfg.unit_map.update(UNIT_MAP) # Variable name mapping # Names from the legacy reporting output. Arbitrarily used the filename for the last # scenario handled in the above loop; this assumes that the set of variable names in # each file is the same (as they should be). names_1 = set(pd.read_excel(filename, usecols=["Variable"])["Variable"]) # Names from the legacy reporting configuration names_2 = set( pd.read_csv( private_data_path("report", "default_variable_definitions.csv"), usecols=["Variable"], )["Variable"] ) # Names from configuration names_3 = cfg.variable_keep # Display diagnostic information log.info( f"""Number of variable names in reporting output {len(names_1) = } in default_variable_definitions.csv {len(names_2) = } in NAVIGATE variables.yaml {len(names_3) = } {len(names_1 - names_2) = } {len(names_2 - names_1) = } {len(names_1 | names_2) = } {len(names_3 - (names_1 | names_2)) = }""" ) # Iterate over names_1 and names_2 cfg.name_map["Variable"] = dict() for var in sorted(names_1 | names_2): # Attempt to transform the variable name target = _variable(var) # Name is different; record it as one to be mapped if target != var: cfg.name_map["Variable"][var] = target # Log more diagnostic info names_4 = set(cfg.name_map["Variable"].values()) log.info( f"""Variable mappings constructed for {len(names_4)} names of which {len(names_3 & names_4)} are accepted by NAVIGATE""" ) return cfg
[docs] def callback(rep: Reporter, context: Context) -> None: """:meth:`.prepare_reporter` callback for NAVIGATE. Adds a key "navigate bmt" that invokes buildings, materials, and transport reporting. """ from message_ix_models.report import register # Set up reporting for each of the model variants all_keys = [] for name, k in ( ("buildings", "buildings all"), ("material", "materials all"), ("transport", "transport iamc all"), # Excludes plots ): if getattr(context.navigate, name): register(f"model.{name}") all_keys.append(k) rep.add("remove_ts", "navigate remove ts", "scenario", "config", "y0") rep.add("navigate all", all_keys) # Add an operation to copy time-series data from a corresponding reference scenario copy_ts_keys = [] if context.navigate.copy_ts: # URL of the other scenario platform_name = rep.get("scenario").platform.name si = context.navigate.copy_ts other_url = f"ixmp://{platform_name}/{si['model']}/{si['scenario']}" # Identify period(s) to copy: 2020 inclusive to the period before y0 y0 = rep.get("y::model")[0] y = rep.get("y") to_copy = y[y.index(2020) : y.index(y0)] log.info(f"Will infill reporting data for year={to_copy} from {other_url}") # Add several steps to copy the data key = copy_ts(rep, other_url, dict(year=to_copy)) copy_ts_keys.append(key) # Possibly an empty list → no-op rep.add("navigate copy ts", copy_ts_keys)
[docs] def legacy_output_path(base_path: Path, scenario: Scenario) -> Path: """Return the path where the legacy reporting writes output for `scenario`. .. todo:: provide this from a function within the legacy reporting submodule; call that function both here and in :func:`.pp_utils.write_xlsx`. """ return base_path.joinpath(f"{scenario.model}_{scenario.scenario}.xlsx")
[docs] def return_func_dict() -> dict[str, Callable]: """Hook for legacy reporting. This function contains a crude hack. :func:`.iamc_report_hackathon.report`, per a "run config" YAML file, e.g. :file:`data/report/navigate-rc.yaml`, finds and calls this function to retrieve the list of functions ("tables") in the file. At that point, we modify the lists of technologies define in :data:`.default_tables.TECHS`. """ from importlib import import_module from message_ix_models.report.legacy import default_tables # Retrieve a context reference # FIXME Don't depend on this being the most recent instance; pass a particular # instance to get_func_dict config = Context.get_instance(-1).navigate # Label for the configured variant variant = "" # "Table" functions from submodules that override the defaults functions = {} for name, tables_name in ( ("buildings", None), ("material", "tables"), ("transport", None), ): if not getattr(config, name): # This module is disabled; do not configure legacy reporting continue # Update the variant name variant += name[0].upper() # Invoke a function named configure_legacy_reporting() from each module to # adjust `TECHS` # FIXME This TECHS dictionary is not present in the version of the legacy # reporting migrated to message_ix_models. It, or this code, must be # updated in order to be usable. module_name = f"message_data.model.{name}.report" import_module(module_name).configure_legacy_reporting(default_tables.TECHS) # type: ignore [attr-defined] # Update `functions` using `func_dict` from the `tables_name` submodule, if any try: functions.update(import_module(f"{module_name}.{tables_name}").func_dict) except ImportError: # .model.buildings and .model.transport: no `func_dict`, because these do # not override any of the legacy reporting functions continue log.debug( f"Configured legacy reporting for -{variant}- model variant:\n" f"{default_tables.TECHS = }" # type: ignore [attr-defined] ) return functions