Source code for message_ix_models.project.engage.workflow

"""ENGAGE workflow pieces for reuse with :class:`message_ix_models.Workflow`.

These functions emulate the collective behaviour of
:class:`message_data.engage.runscript_main`,
:class:`message_data.engage.scenario_runner` and the associated configuration, but are
adapted to be reusable, particularly in the Workflow pattern used in e.g.
:mod:`.project.navigate`.
"""

import logging
from copy import copy, deepcopy
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal, Optional, Union

from iam_units import convert_gwp
from message_ix import Scenario

from message_ix_models import Context, ScenarioInfo
from message_ix_models.model import workflow as model_workflow
from message_ix_models.util import HAS_MESSAGE_DATA, broadcast, identify_nodes
from message_ix_models.workflow import Workflow

if TYPE_CHECKING:
    from typing import TypedDict

    class CommonArgs(TypedDict):
        relation_name: str
        reg: str


if HAS_MESSAGE_DATA:
    # NB This module has not been migrated from message_data.projects.engage.
    from message_data.projects.engage.scenario_runner import ScenarioRunner
else:
    ScenarioRunner = type("ScenarioRunner", (), {})


log = logging.getLogger(__name__)


[docs] @dataclass class PolicyConfig(model_workflow.Config): """Configuration for the 3-step ENGAGE workflow for climate policy scenarios.""" #: Label of the climate policy scenario, often related to a global carbon budget in #: Gt CO₂ for the 21st century (varies). label: str = "" #: Which steps of the ENGAGE workflow to run. Empty list = don't run any steps. steps: list[int] = field(default_factory=lambda: [1, 2, 3]) #: In :func:`step_1`, actual quantity of the carbon budget to be imposed , or the #: value "calc", in which case the value is calculated from :attr:`label` by #: :meth:`.ScenarioRunner.calc_budget`. budget: Union[int, Literal["calc"]] = "calc" #: In :func:`step_3`, optional information on a second scenario from which to copy #: ``tax_emission`` data. tax_emission_scenario: dict = field(default_factory=dict) #: In :func:`step_3`, emission types or categories (``type_emission``) for which to #: apply values for ``tax_emission``. step_3_type_emission: list[str] = field(default_factory=lambda: ["TCE_non-CO2"])
[docs] def calc_hist_cum_CO2( context: Context, scen: Scenario, info: ScenarioInfo ) -> tuple[float, float]: """Calculate historic CO2 emissions. Adapted from :meth:`.engage.scenario_runner.ScenarioRunner.calc_hist_cum_CO2`, with the following differences: - Reported emissions are retrieved for a region with the code "GLB region (R##)", based on `context`. This allows the method to work with either R11 or R12 models. Returns ------- float Cumulative emissions [megatonne CO₂]. float Total duration of historical periods [years]. """ # TODO carefully check whether self.scen and scen being different scenarios would # impact behaviour # Original comment: Filters years between 2010 and first_model_year # TODO clarify the difference between the comment and the code years = list(filter(lambda y: 2020 <= y < info.y0, info.set["year"])) df = scen.timeseries( region=f"GLB region ({context.model.regions})", variable="Emissions|CO2" ) df = df[df.year.isin(years)] df_emis = df[["year", "value"]].set_index("year") df_duration = scen.par("duration_period", filters={"year": years})[ ["year", "value"] ].set_index("year") # Manually set 2018 and 2020 multiplication factor df_duration.at[2018, "value"] = 0.5 df_duration.at[2020, "value"] = 3.5 # manually set 2018 Value df_emis.at[2018, "value"] = 41000 value = (df_emis * df_duration).value.sum() hist_years = df_duration.value.sum() return (value, hist_years)
[docs] def calc_budget( context: Context, scenario: Scenario, bdgt: Union[float, str], method: Union[float, Literal["calc"]], type_emission="TCE", ) -> None: """Calculate **and apply** budget. Adapted from :meth:`.engage.scenario_runner.ScenarioRunner.calc_budget`, with the following differences: - :func:`.calc_hist_cum_CO2` above is called, instead of the method of the same name on ScenarioRunner. - `scenario` is an argument, not retrieved from the `scen` attribute of ScenarioRunner. - :mod:`iam_units` is used for unit conversion. .. todo:: Fix confusing semantics, here and/or in the original; e.g. use a `value` argument and/or config class. Parameters ---------- bdgt : float or str If `method` is "calc", this must be the budget expressed as total gigatonnes of CO₂ for the period 2010–2100. Otherwise, the argument is ignored. method : float or str Literal "calc" to calculate a constraint budget based on `bdgt`; otherwise, budget constraint value expressed as average Mt C-eq / a. """ from message_data.tools.utilities import add_budget if method == "calc": info = ScenarioInfo(scenario) # Target is provided in cumulative Gt 2010-2100 value = float(bdgt) # Convert Gt CO2 to Mt CO2 value *= 1000 # NB the original passed ScenarioRunner.base, rather than ScenarioRunner.scen # TODO carefully check whether this distinction meant anything hist_cum, hist_year = calc_hist_cum_CO2(context, scenario, info) value -= hist_cum # Conversion to MtC value *= convert_gwp(None, (value, "Mt CO2"), "C").magnitude # Divide by the number of years covered. "-2" is because in the calculation of # the budget we account for 2 more years than the model sees. value /= (info.Y[-1] - info.Y[0] + 10) - 2 # The amount of years over which the budget is distributed should be 82 years # (from 2018 to 2100). The number of historic years are subtracted. # (do the extra 10 years need to be added)? # value /= 82 - hist_year + 10 else: value = method add_budget(scenario, value, type_emission=type_emission)
[docs] def step_0(context: Context, scenario: Scenario, **kwargs) -> Scenario: """Preparation for the ENGAGE climate policy workflow. These operations must occur no matter which combinations of 1 or more of :func:`step_1`, :func:`step_2`, and/or :func:`step_3` are to be run on `scenario`. Compare :func:`.model.workflow.step_0`. In this function: - :func:`.add_AFOLU_CO2_accounting` is called with :attr:`METHOD.A <.add_AFOLU_CO2_accounting.METHOD.A>`, which is no longer the default. - :func:`.add_alternative_TCE_accounting` is called with :attr:`METHOD.A <.add_alternative_TCE_accounting.METHOD.A>`, which is no longer the default. """ from message_ix_models.tools import ( add_AFOLU_CO2_accounting, add_alternative_TCE_accounting, add_CO2_emission_constraint, add_FFI_CO2_accounting, remove_emission_bounds, ) try: scenario.remove_solution() except ValueError: pass # Solution did not exist remove_emission_bounds.main(scenario) # Identify the node codelist used by `scenario` (in case it is not set on `context`) context.model.regions = identify_nodes(scenario) kw: "CommonArgs" = dict( relation_name=context.model.relation_global_co2, reg=f"{context.model.regions}_GLB", ) # “Step1.3 Make changes required to run the ENGAGE setup” (per .runscript_main) log.info("Add separate FFI and AFOLU CO2 accounting") add_FFI_CO2_accounting.main(scenario, **kw) add_AFOLU_CO2_accounting.add_AFOLU_CO2_accounting( scenario, method=add_AFOLU_CO2_accounting.METHOD.A, **kw ) log.info("Add alternative TCE accounting") add_alternative_TCE_accounting.main( scenario, method=add_alternative_TCE_accounting.METHOD.A ) add_CO2_emission_constraint.main( scenario, **kw, constraint_value=0.0, type_rel="lower" ) return scenario
[docs] def step_1(context: Context, scenario: Scenario, config: PolicyConfig) -> Scenario: """Step 1 of the ENGAGE climate policy workflow. If the :attr:`~.PolicyConfig.method` attribute of `policy_config` is "calc", then `scenario` must contain time series data for variable="Emissions|CO2" and region="GLB region (R##)"; see :func:`calc_hist_cum_CO2`. This is typically provided by the legacy reporting. If the attribute has a float value specifying a budget directly, this condition does not apply. """ # Calculate **and apply** budget calc_budget( context, scenario, bdgt=config.label, method=config.budget, type_emission="TCE_CO2", ) return scenario
[docs] def step_2(context: Context, scenario: Scenario, config: PolicyConfig) -> Scenario: """Step 2 of the ENGAGE climate policy workflow.""" from message_ix_models.tools import add_emission_trajectory # Retrieve a pandas.DataFrame with the CO2 emissions trajectory # # NB this method: # - does not use any class or context attributes, so it can be called on any # instance of ScenarioRunner. # TODO separate the method from the class as a stand-alone function # - does not require legacy reporting output; only the variable "EMISS", i.e. # `scenario` must have solution data. sr = ScenarioRunner(context) df = sr.retr_CO2_trajectory(scenario) try: scenario.remove_solution() except ValueError: pass # Solution did not exist # Add this trajectory as bound_emission values add_emission_trajectory.main( scenario, data=df, type_emission="TCE_CO2", unit="Mt C/yr", remove_bounds_emission=True, ) with scenario.transact(message="Remove lower bound on global total CO₂ emissions"): name = "relation_lower" filters = dict(relation=[context.model.relation_global_co2]) scenario.remove_par(name, scenario.par(name, filters=filters)) return scenario
[docs] def retr_CO2_price(scen: Scenario): """Retrieve ``PRICE_EMISSION`` data and transform for use as ``tax_emission``. This is identical to :meth:`ScenarioRunner.retr_CO2_price`, except without the argument `new_type_emission`. For the same effect, use pandas built-in :meth:`~.DataFrame.assign` or, to create data for multiple ``type_emission``, use :func:`message_ix_models.util.broadcast`. """ return ( scen.var("PRICE_EMISSION") .assign(unit="USD/tC", type_emission=None) .rename(columns={"lvl": "value", "year": "type_year"}) .drop("mrg", axis=1) )
[docs] def step_3(context: Context, scenario: Scenario, config: PolicyConfig) -> Scenario: """Step 3 of the ENGAGE climate policy workflow.""" if config.tax_emission_scenario: # Retrieve CO2 prices from a different scenario source = Scenario(scenario.platform, **config.tax_emission_scenario) else: # Retrieve CO2 prices from `scenario` itself source = scenario # Retrieve a data frame with CO₂ prices, and apply specific ``type_emission`` df = retr_CO2_price(source).pipe( broadcast, type_emission=config.step_3_type_emission ) del source # No longer used → free memory try: scenario.remove_solution() except ValueError: pass # Solution did not exist with scenario.transact(message=f"Add price for {config.step_3_type_emission}"): scenario.add_par("tax_emission", df) # As in step_2, remove the lower bound on global CO2 emissions. This is # necessary if step_2 was not run (for instance, NAVIGATE T6.2 protocol) name = "relation_lower" filters = dict(relation=[context.model.relation_global_co2]) scenario.remove_par(name, scenario.par(name, filters=filters)) return scenario
[docs] def add_steps( workflow: Workflow, base: str, config: PolicyConfig, name: Optional[str] = None ) -> str: """Add steps to `workflow` for running ENGAGE scenarios on `base`. Parameters ---------- workflow base Prior workflow step/scenario to start from. config Depending on the :attr:`~.PolicyConfig.steps` attribute, from 0 to 3 workflow steps are added. name : str, optional Name template for the added steps. Returns ------- str name of the last workflow step added, or `base` if none are added. """ workflow.graph["context"].setdefault("run_reporting_only", False) # Base name for the added steps name_root = f"{name or base} EN" # Label for steps label = str(config.label).replace(" ", "_") # Model and scenario name for the scenario produced by the base step info, _ = workflow.guess_target(base, "scenario") # Template for new model/scenario name at each step target = f"{info['model']}/{name or base} ENGAGE_{label}_step-{{}}" # Model to solve solve_model = config.solve["model"] # If config.steps is non-empty, insert step_0. Otherwise, leave empty steps = copy(config.steps) if len(steps): steps.insert(0, 0) # Iterate over [0, 1, 2, 3] or fewer ENGAGE `steps` s = base # Current step name for step in steps: # Duplicate `config` and modify for this particular step cfg = deepcopy(config) if solve_model == "MESSAGE-MACRO" and step > 1: # Give scenario info from which to copy "DEMAND" variable data; data is # copied to the "demand" parameter cfg.demand_scenario.update(model=info["model"]) cfg.demand_scenario.setdefault( "scenario", target.split("/")[-1].format(step - 1) ) if step == 2: # Do not solve MESSAGE-MACRO for step 2, even if doing so for steps 1/3 cfg.solve.update(model="MESSAGE") elif step == 3: # May help improve long solve times, per # https://iiasa-ece.slack.com/archives/C03M5NX9X0D/p1678703978634529? # thread_ts=1678701550.474199&cid=C03M5NX9X0D cfg.solve["solve_options"].update(predual=1) # Add step s = workflow.add_step( f"{name_root}{step}", s, # Get a reference to the function step_0() etc. action=globals()[f"step_{step}"], # Always clone to new URL; if step_0, then shift the model horizon clone=dict(keep_solution=True), target=target.format(step), # Target URL config=cfg, ) if step > 0: # Add a step to solve the scenario (except for after step_0); update the # step name for the next loop iteration s = workflow.add_step( f"{s} solved", s, model_workflow.solve, config=cfg, set_as_default=True ) # Return the name of the last of the added steps return s