Source code for message_ix_models.project.ssp.transport

"""Postprocess aviation emissions for SSP 2024."""

import logging
import re
from collections.abc import Hashable
from enum import Enum, auto
from functools import cache
from itertools import product
from typing import TYPE_CHECKING, Literal, Optional

import genno
import pandas as pd
from genno import Key, quote

from message_ix_models import Context
from message_ix_models.model.structure import get_codelist
from message_ix_models.tools.iamc import iamc_like_data_for_query, to_quantity
from message_ix_models.util import minimum_version
from message_ix_models.util.genno import Keys

if TYPE_CHECKING:
    import pathlib

    import sdmx.model.common
    from genno import Computer
    from genno.types import AnyQuantity, TQuantity

log = logging.getLogger(__name__)

#: Dimensions of several quantities.
DIMS = "e n t y UNIT".split()

#: Expression used to select and extract :math:`(e, s, t)` dimension coordinates from
#: variable codes in :func:`v_to_emi_coords`.
EXPR_EMI = re.compile(
    r"""
    ^Emissions
      \|(?P<e>[^\|]+)
      (\|
        (?P<s>
          Energy(\|(Combustion|Demand))?
          | Fossil.Fuels.and.Industry
        )
        (\|
          (?P<t>(Bunkers|Transportation).*)
        )?
      )?
    $""",
    flags=re.VERBOSE,
)

#: Expression used to select and extract :math:`(c)` dimension coordinates from variable
#: codes in :func:`v_to_fe_coords`.
EXPR_FE = re.compile(
    r"""^Final.Energy\|
    (?P<t>Bunkers(\|International.Aviation)?|(Transportation(|.\(w/.bunkers\))))
    \|?
    (?P<c>|Electricity|Liquids\|Oil)
    $""",
    flags=re.VERBOSE,
)

#: Keywords for :func:`.iamc_like_data_for_query` / :func:`.to_quantity`.
IAMC_KW = dict(non_iso_3166="keep", query="Model != ''", unique="MODEL SCENARIO")

#: Common label / :attr:`.Key.name` / :attr:`.Key.tag`.
L = "AIR emi"

#: Fixed keys prepared by :func:`.get_computer` and other functions:
#:
#: - :py:`.bcast`: the output of :func:`.broadcast_st_emi`.
#: - :py:`.input`: input data from file or calling code, converted to Quantity.
#: - :py:`.emi`: computed aviation emissions.
#: - :py:`.emi_in`: input data for aviation and other transport emissions, to be
#:   adjusted or overwritten.
#: - :py:`.fe`: computed final energy data.
#: - :py:`.fe_in`: input data for transport final energy, to be adjusted or overwritten.
K = Keys(
    bcast=f"broadcast:s-t:{L}",
    bcast_other=f"broadcast:e-s-t:{L}+other",
    input=f"input:n-y-VARIABLE-UNIT:{L}",
    emi=f"emission:e-n-s-t-y-UNIT:{L}",
    emi_in=f"emission:e-n-s-t-y-UNIT:{L}+in",
    fe_in=f"fe:c-n-t-y:{L}+in",
    fe_out=f"fe:c-n-t-y:{L}+out",
    units=f"units:e-UNIT:{L}",
)


[docs] class METHOD(Enum): """Method for computing emissions.""" #: See :func:`.method_A`. A = auto() #: See :func:`.method_B`. B = auto() #: See :func:`.method_C`. C = auto()
[docs] def aviation_emi_share(ref: "TQuantity") -> "TQuantity": """Return (dummy) data for the share of aviation in emissions. Currently this returns exactly the value `0.2`. Parameters ---------- ref : Reference quantity. The dimensions and coordinates :math:`(n, e, y)` of the returned value exactly match `ref`. Returns ------- genno.Quantity with dimensions :math:`(n, e, y)`. """ return ( genno.Quantity(0.2, units="dimensionless") .expand_dims({"e": sorted(ref.coords["e"].data)}) .expand_dims({"n": sorted(ref.coords["n"].data)}) .expand_dims({"y": sorted(ref.coords["y"].data)}) )
[docs] def broadcast_st_emi( version: Literal[1, 2], include_international: bool ) -> "AnyQuantity": """Quantity to re-add the :math:`(s, t)` dimensions for emission data. Parameters ---------- version : Version of ‘variable’ names supported by the current module. include_international : If :any:`True`, include "Transportation|Aviation|International" with magnitude 1.0. Otherwise, omit. Return ------ genno.Quantity with dimensions :math:`(s, t)`. If :py:`version=1`, the :math:`t`: values include: - +1.0 for t="Transportation|Aviation", a label with missing data. - -1.0 for t="Transportation|Road Rail and Domestic Shipping", a label with existing data from which the aviation total must be subtracted. If :py:`version=2`, the values include: - +1.0 for t="Bunkers" and t="Bunkers|International Aviation", labels with zeros in the input data file. - -1.0 for t="Transportation" and t="Transportation|Road Rail and Domestic Shipping", labels with existing data from which the aviation total must be subtracted. """ if version == 1: # pragma: no cover value = [1, -1, 1] t = [ "Transportation|Aviation", "Transportation|Road Rail and Domestic Shipping", "Transportation|Aviation|International", ] idx = slice(None) if include_international else slice(-1) elif version in (2, 3): value = [1, 1, -1, -1] t = [ "Bunkers", "Bunkers|International Aviation", "Transportation", "Transportation|Road Rail and Domestic Shipping", ] idx = slice(-2 if version == 3 else None) return genno.Quantity(value[idx], coords={"t": t[idx]}).expand_dims( {"s": ["Energy|Demand"]} )
[docs] def broadcast_est_emi_other(q_ref: "TQuantity") -> "TQuantity": """Quantity to broadcast values for other :math:`(e, s, t)`. - s = "Energy|Combustion" is included only for e in ("CH4", "CO2", "N2O"), because these are the only species in the input data with existing values. """ e, e_combustion = sorted(q_ref.coords["e"].data), ("CH4", "CO2", "N2O") s = [ "_T", "Energy", "Energy|Combustion", "Energy|Demand", "Fossil Fuels and Industry", ] dims = ["e", "s"] df = ( pd.DataFrame([list(e_s) for e_s in product(e, s)], columns=dims) .assign(value=1.0) .query("not (s == 'Energy|Combustion' and e not in @e_combustion)") .set_index(dims)["value"] ) del e_combustion return type(q_ref)(df, units="").expand_dims({"t": ["_T"]})
[docs] def broadcast_t_fe() -> "AnyQuantity": """Quantity to re-add the |t| dimension for final energy data.""" return genno.Quantity( pd.DataFrame( [ ["lightoil", "Bunkers", "", +1.0], ["lightoil", "Bunkers|International Aviation", "", +1.0], ["lightoil", "Bunkers", "Liquids|Oil", +1.0], ["lightoil", "Transportation", "", -1.0], ["lightoil", "Transportation", "Liquids|Oil", -1.0], ], columns=["c", "t", "c_new", "value"], ).set_index(["c", "t", "c_new"])["value"] )
[docs] def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity": """Return a quantity for broadcasting. Returns ------- genno.Quantity with one value :math:`Q_{e, UNIT} = 1.0` for every label |e| in `cl_emission`, with "UNIT" being the unit expression to be used with IAMC- structured data. Values are everywhere 1.0, except for species such as ``N2O`` that must be reported in kt rather than Mt. """ # Iterate over codes in the codelist data = [] for e in cl_emission: # Retrieve info from annotations i = {} for k, default in {"report": e.id, "unit-species": e.id, "units": "Mt"}.items(): try: i[k] = str(e.get_annotation(id=k).text) except KeyError: i[k] = default scale_factor = 1.0 if i["units"] == "Mt" else 1e3 data.append([i["report"], f"{i['units']} {i['unit-species']}/yr", scale_factor]) dims = "e UNIT value".split() return genno.Quantity( pd.DataFrame(data, columns=dims).set_index(dims[:-1])[dims[-1]] )
[docs] def finalize( q_all: "TQuantity", q_emi_update: "TQuantity", q_fe_update: "TQuantity", model_name: str, scenario_name: str, ) -> pd.DataFrame: """Finalize output. 1. Reattach "Model" and "Scenario" labels. 2. Reassemble the "Variable" dimension/coords of `q_update`; drop "e" and "t". 3. Convert both `q_all` and `q_update` to :class:`pandas.Series`; update the former with the contents of the latter. This retains all other, unmodified data in `q_all`. 4. Adjust to IAMC ‘wide’ structure. Parameters ---------- q_all : All data. Quantity with dimensions :math:`(n, y, UNIT, VARIABLE)`. q_update : Revised data to overwrite corresponding values in `q_all`. Quantity with dimensions :data:`DIMS`. """ def _expand(qty): return qty.expand_dims( {"Model": [model_name], "Scenario": [scenario_name]} ).rename({"n": "Region", "UNIT": "Unit", "VARIABLE": "Variable"}) # Convert `q_all` to pd.Series s_all = q_all.pipe(_expand).to_series() # - Convert `q_emi_update` to pd.Series # - Reassemble "Variable" codes. # - Drop dimensions (e, t). # - Align index with s_all. s_emi_update = ( q_emi_update.pipe(_expand) .to_frame() .reset_index() .assign( Variable=lambda df: ( "Emissions|" + df["e"] + "|" + df["s"] + "|" + df["t"] ).str.replace(r"(\|_T)+$", "", regex=True) ) .drop(["e", "s", "t"], axis=1) .set_index(s_all.index.names)[0] .rename("value") ) log.info(f'{len(s_emi_update)} obs to update for Variable="Emission|…"') # Likewise for q_fe_update dim = {"UNIT": [f"{q_fe_update.units:~}".replace("EJ / a", "EJ/yr")]} s_fe_update = ( q_fe_update.expand_dims(dim=dim) .pipe(_expand) .to_frame() .reset_index() .assign( Variable=lambda df: ("Final Energy|" + df["t"] + "|" + df["c"]).str.replace( r"\|$", "", regex=True ) ) .drop(["c", "t"], axis=1) .set_index(s_all.index.names)[0] .rename("value") ) log.info(f'{len(s_fe_update)} obs to update for Variable="Final Energy|…"') # - Concatenate s_all, s_emi_update, and s_fe_update as columns of a data frame. # The result has the superset of the indices of the arguments. # - Fill along axes. Values from s_*_update end up in the last column. # - Select the last column. # - Reshape to wide format. # - Rename index levels and restore to columns. return ( pd.concat([s_all, s_emi_update, s_fe_update], axis=1) .ffill(axis=1) .iloc[:, -1] .unstack("y") .reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"]) .reset_index() )
[docs] @minimum_version("message_ix_models.model.transport.build.get_computer") def get_computer( row0: "pd.Series", method: METHOD, *, platform_name: Optional[str] = None ) -> "Computer": """Prepare `c` to process aviation emissions data. Parameters ---------- row0 : A single sample row of the input data. "Model" and "Scenario" must be in the index; these are used to reconstruct the IAMC data structure. method : Select the calculation method. platform_name : Configured name of a :class:`.Platform` containing solved MESSAGEix-Transport scenarios. Returns ------- Computer Calling :py:`c.get("target")` triggers the calculation of the result. """ from message_ix_models.model import Config as ModelConfig from message_ix_models.model.transport import Config as TransportConfig from message_ix_models.model.transport import workflow # Create a Computer instance c = genno.Computer() c.require_compat("message_ix_models.report.operator") # Create a Context instance. Only R12 is supported. context = Context(model=ModelConfig(regions="R12")) # Store in `c` for reference by other operations c.add("context", context) c.graph["config"].update(regions="R12") # Store a model name and scenario name from a single row of the data model_name, scenario_name = row0[["Model", "Scenario"]] c.add("model name", genno.quote(model_name)) c.add("scenario name", genno.quote(scenario_name)) # For method_C context.core.dest_scenario["model"] = "ci nightly" context.core.platform_info.setdefault("name", platform_name or "ixmp-dev") context.report.register("model.transport") # For method_C, identify the URL of a solved MESSAGEix-Transport scenario from which # to retrieve transport data. These steps mirror .transport.workflow.generate(). # Retrieve a Code with annotations describing the transport scenario. sc = get_scenario_code(model_name, scenario_name) # - Create and store a .transport.Config instance. # - Update it using the `sc`. # - Retrieve a 'label' used to construct a target scenario URL. label_full = TransportConfig.from_context(context).use_scenario_code(sc)[1] # Construct the target scenario URL url = workflow.scenario_url(context, label_full) # Optionally apply a regex substitution URL_SUB = { "LED-SSP1": ("$", "#162"), # Point to a specific version "LED-SSP2": ("$", "#171"), "SSP1": ("$", "#771"), "SSP2": ("$", "#869"), "SSP3": ("$", "#686"), "SSP4": ("$", "#639"), "SSP5": ("$", "#649"), # "SSP5": ("(SSP_2024.5) baseline$", r"\1 baseline#525"), # Other scenario name } if pattern_repl := URL_SUB.get(sc.id): url = re.sub(pattern_repl[0], pattern_repl[1], url) # Use the URL to update context.core.scenario_info context.handle_cli_args(url=url) log.info(f"method 'C' will use data from {url}") # Common structure and utility quantities used by method_[ABC] c.add(K.bcast, broadcast_st_emi, version=3, include_international=method == "A") c.add(K.units, e_UNIT, "e::codelist") c.add(K.bcast_other, broadcast_est_emi_other, K.units) # Placeholder for data-loading task. This is filled in later by process_df() or # process_file(). c.add(K.input, None) # Select and transform data matching EXPR_EMI # Filter on "VARIABLE", extract the (e, t) dimensions c.add(K.emi_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_emi_coords}) # Assign units c.add(K.emi_in, "assign_units", K.emi_in[0], units="Mt/year") # Select and transform data matching EXPR_FE # Filter on "VARIABLE", extract the (c, t) dimensions dim_cb = {"VARIABLE": v_to_fe_coords} c.add(K.fe_in[0] * "UNITS", "select_expand", K.input, dim_cb=dim_cb) # Convert "UNIT" dim labels to Quantity.units c.add(K.fe_in[1], "unique_units_from_dim", K.fe_in[0] * "UNITS", dim="UNIT") # Change labels e.g. "AFR" → "R12_AFR"; see get_label() c.add(K.fe_in, "relabel", K.fe_in[1], labels=get_labels()) # Call a function to prepare the remaining calculations up to K.emi method_func = {METHOD.A: method_A, METHOD.B: method_B, METHOD.C: method_C}[method] method_func(c) # Adjust the original data by adding the (maybe negative) prepared values at K.emi c.add(K.emi["adj"], "add", K.emi_in, K.emi) c.add(K.fe_out["adj"], "add", K.fe_in[1], K.fe_out) # Add a key "target" to: # - Collapse to IAMC "VARIABLE" dimension name. # - Recombine with other/unaltered original data. c.add( "target", finalize, K.input, K.emi["adj"], K.fe_out["adj"], "model name", "scenario name", ) return c
[docs] @cache def get_labels(): """Return mapper for relabelling input data: - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'. - n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed. """ cl = get_codelist("node/R12") labels = dict( c={"Electricity": "electr", "Liquids|Oil": "lightoil", "": "_T"}, n={} ) for n in filter(lambda n: len(n.child) and n.id != "World", cl): labels["n"][n.id.partition("_")[2]] = n.id return labels
[docs] def get_scenario_code(model_name: str, scenario_name: str) -> "sdmx.model.common.Code": """Return a specific code from ``CL_TRANSPORT_SCENARIO``. See :func:`.get_cl_scenario`. This function handles (`model_name`, `scenario_name`) combinations seen in base model outputs as of 2025-04-02. """ from message_ix_models.model.transport.config import get_cl_scenario model_parts = model_name.split("_") if model_parts[:2] == ["SSP", "LED"]: code_id = "LED-SSP2" if scenario_name.startswith("SSP2") else "LED-SSP1" else: code_id = model_parts[1] return get_cl_scenario()[code_id]
[docs] def method_A(c: "Computer") -> None: """Prepare calculations up to :data:`K.emi` using :data:`METHOD.A`. This method uses a fixed share of data for variable=``Emissions|*|Energy|Demand|Transportation``. 1. Select data with variable names matching :data:`EXPR_EMI`. 2. Calculate (identical) values for: - ``Emissions|*|Energy|Demand|Transportation|Aviation`` - ``Emissions|*|Energy|Demand|Transportation|Aviation|International`` …as the product of :func:`aviation_emi_share` and ``Emissions|*|Energy|Demand|Transportation``. 3. Subtract (2) from: ``Emissions|*|Energy|Demand|Transportation|Road Rail and Domestic Shipping`` """ # Select the total indexers = dict(t="Transportation") c.add(K.emi[0] / "t", "select", K.emi_in, indexers=indexers, drop=True) # Retrieve the aviation share of emissions k_share = Key("emi share", tuple("eny"), L) c.add(k_share, aviation_emi_share, K.emi_in) # - (emission total) × (aviation share) → emissions of aviation # - Re-add the "t" dimension with +ve sign for "Aviation" and -ve sign for "Road # Rail and Domestic Shipping" c.add(K.emi, "mul", K.emi[0] / "t", k_share, K.bcast) # No change to final energy data c.add(K.fe_out, genno.Quantity(0.0, units="EJ / a"))
[docs] def method_B(c: "Computer") -> None: """Prepare calculations up to :data:`K.emi` using :data:`METHOD.B`. This method uses the |y0| share of aviation in total transport final energy as indicated by :class:`.IEA_EWEB`, with dimensions :math:`(c, n)`, to disaggregate total final energy from the input data, then applies emission intensity data to compute aviation emissions. Excluding data transformations, units, and other manipulations for alignment: 1. From the :class:`.IEA_EWEB` 2024 edition, select data for :math:`y = 2019`. 2. Aggregate IEA EWEB data to align with MESSAGEix-GLOBIOM |c|. 3. Compute the ratio of ``_1`` to ``_2`` (see :func:`.web.transform_C` for how these labels are produced). This is the share of aviation in final energy. 4. Add the steps from :func:`.method_BC_common`. """ from message_ix_models.model.transport import build context: Context = c.graph["context"] # Add the same structure information and exogenous data used in the build and report # workflow steps for MESSAGEix-Transport, in particular: # - e::codelist # - groups::iea to transport # - energy::n-y-product-flow:iea —using .tools.iea.web.IEA_EWEB build.get_computer(context, c) # Shorthand for keys and sequences of keys fe = Keys( cnt=f"energy:c-n-t:{L}+0", iea="energy:n-product-flow:iea", share=f"fe share:c-n:{L}", ) # Prepare data from IEA EWEB: the share of aviation in transport consumption of each # 'c[ommodity]' # Select data for 2019 only c.add(fe.iea[0], "select", fe.iea * "y", indexers=dict(y=2019), drop=True) # Only use the aggregation on the 'product' dimension, not on 'flow' g = Key("groups:p:iea to transport") c.add(g, lambda d: dict(product=d["product"]), "groups::iea to transport") # Aggregate IEA 'product' dimension for alignment to MESSAGE 'c[ommodity]' c.add(fe.iea[1], "aggregate", fe.iea[0], g, keep=False) # Rename dimensions c.add(fe.cnt[0], "rename_dims", fe.iea[1], name_dict=dict(flow="t", product="c")) # Global total c.add("n::world agg", "nodes_world_agg", "config", dim="n", name=None) c.add(fe.cnt[1], "aggregate", fe.cnt[0], "n::world agg", keep=False) # Ratio of _1 (DOMESAIR - AVBUNK) to _2 (TOTTRANS - AVBUNK) c.add(fe.share[0], "select", fe.cnt[1], indexers=dict(t="_1"), drop=True) c.add(fe.share[1], "select", fe.cnt[1], indexers=dict(t="_2"), drop=True) c.add(fe.share, "div", fe.share[0], fe.share[1]) # Prepare remaining calculations method_BC_common(c, fe.share)
[docs] def method_BC_common( c: "Computer", k_fe_share: "Key", k_emi_share: Optional["Key"] = None ) -> None: """Common steps for :func:`.method_B` and :func:`.method_C`. 1. From the input data (:data:`K.input`), select the values matching :data:`EXPR_FE`, that is, final energy use by aviation. 2. Load emissions intensity of aviation final energy use from the file :ref:`transport-input-emi-intensity`. 3. Multiply (k_fe_share) × (1) × (2) to compute the estimate of aviation emissions. 4. Estimate adjustments according to :func:`broadcast_t`. Parameters ---------- k_fe_share A key with dimensions either :math:`(c, n)` or :math:`(c, n, y)` giving the share of aviation in total transport final energy. k_emi_share A key giving the share of aviation in total transport emissions. """ from message_ix_models.model.transport.key import exo # Check dimensions of k_fe_share exp = {frozenset("cn"), frozenset("cny")} if set(k_fe_share.dims) not in exp: # pragma: no cover raise ValueError(f"Dimensions of k_cn={k_fe_share.dims} are not in {exp}") # Shorthand for keys and sequences of keys k = Keys( ei=exo.emi_intensity, # Dimensions (c, e, t) emi0=f"emission:c-e-n-y:{L}", fe=f"fe:c-n-y:{L}+BC", ) ### Compute estimate of emissions # Select only total transport consumption of lightoil from K.fe_in indexers = {"t": "Transportation (w/ bunkers)"} c.add(k.fe[0], "select", K.fe_in, indexers=indexers, drop=True) # Product of aviation share and FE of total transport → FE of aviation c.add(k.fe, "mul", k.fe[0], k_fe_share) # Convert exogenous emission intensity data to Mt / EJ c.add(k.ei["units"], "convert_units", k.ei, units="Mt / EJ") to_mul = [k.fe, k.ei["units"]] # Disabled; see https://github.com/iiasa/message-ix-models/issues/387 if False: # pragma: no cover to_mul.append(track_GAINS(c)) # - (FE of aviation) × (emission intensity) × (adjustment) → emissions of aviation # - Drop/partial sum over 1 label ("AIR") on dimension "t". c.add(k.emi0[0], "mul", *to_mul, sums=True) # Convert units to megatonne per year c.add(k.emi0[1], "convert_units", k.emi0[0], units="Mt / year", sums=True) # - Add "UNIT" dimension and adjust magnitudes for species where units must be kt. # See e_UNIT(). # - Drop/partial sum over dimension "c". c.add(K.emi[2], "mul", k.emi0[1] / "c", K.units) # Re-add the (s, t) dimensions with +ve and -ve signs for certain labels c.add(K.emi[3], "mul", K.emi[2], K.bcast) to_concat = [K.emi[3]] if k_emi_share: # pragma: no cover —only for METHOD.C # Adjust total transportation emissions: multiply k_fe_share by input data # TODO Also try k_emi_share here c.add(K.emi_in["all"], "select", K.emi_in, indexers={"t": ["Transportation"]}) c.add(K.emi[4], "mul", K.emi_in["all"], k_fe_share, -1.0) to_concat.append(K.emi[4]) # Adjust totals beyond transportation c.add(K.emi[5], "add", K.emi[2], K.emi[4] / ("s", "t")) c.add(K.emi[6], "select", K.emi[5], indexers={"n": ["World"]}) c.add(K.emi[7], "mul", K.emi[6], K.bcast_other) to_concat.append(K.emi[7]) else: pass # Concatenate emissions values to be modified c.add(K.emi[8], "concat", *to_concat) # Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed. labels = dict(n={v: k for k, v in get_labels()["n"].items()}) c.add(K.emi, "relabel", K.emi[8], labels=labels) # Re-add the "t" dimension with +ve and -ve sign for certain labels c.add(K.fe_out[0], "mul", k.fe, broadcast_t_fe()) c.add(K.fe_out[1], "drop_vars", K.fe_out[0] * "c_new", names="c") c.add(K.fe_out[2], "rename_dims", K.fe_out[1], name_dict={"c_new": "c"}) # Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed. c.add(K.fe_out[3], "relabel", K.fe_out[2], labels=labels) # Drop data for y0 c.add(K.fe_out, "select", K.fe_out[3], indexers=dict(y=[2020]), inverse=True)
[docs] def method_C(c: "Computer") -> None: # pragma: no cover """Prepare calculations up to :data:`K.emi` using :data:`METHOD.C`. This method uses a solved MESSAGEix-Transport scenario to compute the share of aviation in total transport final energy, with dimensions :math:`(c, n, y)`, and the proceeds similarly to :func:`method_B`. Excluding data transformations, units, and other manipulations for alignment: 1. Identify a corresponding base scenario of MESSAGEix-Transport with a solution. 2. From the model solution data, compute the share of `AIR` in total transport final energy. 3. Apply the steps from :func:`.method_BC_common`. """ from message_ix_models.report import prepare_reporter from message_ix_models.util.genno import update_computer context: Context = c.graph["context"] # - Prepare a Reporter to retrieve model solution data from `target_url`. # - Transfer all its tasks to `c` update_computer(c, prepare_reporter(context)[0]) # Prepare `c` to compute the final energy share for aviation k = Keys( # Added by .transport.base.prepare_reporter() base="in:n-t-y-c:transport+units", share0=f"fe share:c-n-y:{L}", share1=f"fe share:n-y:{L}+ex AIR+ex electr", ) # Rename dimensions as expected by method_BC_common c.add( k.base[0], "rename_dims", "in:nl-t-ya-c:transport+units", name_dict={"nl": "n", "ya": "y"}, ) # Relabel "R12_GLB" (added by .report.transport.aggregate()) to "World" labels = {"n": {"R12_GLB": "World"}} c.add(k.base[1], "relabel", k.base[0], labels=labels, sums=True) # Select the numerator; drop the 't' dimension c.add(k.share0["num"], "select", k.base[1], indexers=dict(t="AIR"), drop=True) # Ratio of AIR to the total c.add(k.share0, "div", k.share0["num"], k.base[1] / "t") # Ratio of FE ex c=electr for (numerator) all modes except AIR and (denominator) # all modes # Common: select all except c="electr" idx = dict(c=["electr"]) c.add( k.share1[0] * ("t", "c"), "select", k.base[1], indexers=idx, inverse=True, sums=True, ) # Denominator: all modes idx = {"t": ["AIR"]} c.add(k.share1["denom"] * "t", "select", k.share1[0] * "t", indexers=idx, sums=True) # Numerator: all modes except "AIR" idx = {"t": ["F", "P"]} c.add(k.share1["num"] * "t", "select", k.share1[0] * "t", indexers=idx, sums=True) # Ratio of numerator/denominator c.add(k.share1, "div", k.share1["num"], k.share1["denom"]) method_BC_common(c, k.share0, k.share1)
[docs] def process_df( data: pd.DataFrame, *, method: METHOD = METHOD.B, platform_name: Optional[str] = None, ) -> pd.DataFrame: """Process `data`. Same as :func:`process_file`, except the data is returned as a data frame in the same structure as `data`. For the meaning of parameters `method` and `platform_name`, see :func:`get_computer`. """ # Prepare all other tasks c = get_computer(data.iloc[0, :], method, platform_name=platform_name) def fillna(df: pd.DataFrame) -> pd.DataFrame: """Replace :py:`np.nan` with 0.0 in certain rows and columns.""" mask = df.Variable.str.fullmatch( r"Emissions\|[^\|]+\|Energy\|Demand\|(Bunkers|Transportation).*" ) to_fill = {c: 0.0 for c in df.columns if str(c).isnumeric() and int(c) >= 2020} return df.where(~mask, df.fillna(to_fill)) # Input data: replace NaN with 0 c.add(K.input[0], fillna, data) # Convert `data` to a Quantity with the appropriate structure c.add(K.input, to_quantity, K.input[0], **IAMC_KW) # Compute and return the result return c.get("target")
[docs] def process_file( path_in: "pathlib.Path", path_out: "pathlib.Path", *, method: METHOD, platform_name: Optional[str] = None, ) -> None: """Process data from file. 1. Read input data from `path_in` in IAMC CSV format. 2. Call :func:`get_computer` and in turn one of :func:`method_A`, :func:`method_B`, or :func:`method_C` according to the value of `method`. 3. Write to `path_out` in the same format as (1). Parameters ---------- path_in : Input data path. path_out : Output data path. method : One of :class:`METHOD`. """ # Peek at `path` for a row containing the model and scenario names row0 = pd.read_csv(path_in, nrows=1).iloc[0, :] # Prepare all other tasks c = get_computer(row0, method) # Input data: read from `path_in` c.add(K.input, iamc_like_data_for_query, path=path_in, **IAMC_KW) # Execute, write the result to `path_out` c.get("target").to_csv(path_out, index=False)
[docs] def track_GAINS(c: "Computer") -> "Key": """Prepare `c` to compute adjustment factor to track declining GAINS EF.""" k, _t, U = Key(f"ADJ:n-e-y-UNIT:{L}"), "Transportation (w/ bunkers)", "UNIT" # Numerator: setelect total emissions from input c.add(k["n0"], "select", K.emi_in, indexers={"t": "Transportation"}, drop=True) # Relabel e.g. "AFR" → "R12_AFR" c.add(k["num"], "relabel", k["n0"], labels=get_labels()) # Denominator: select transport final energy total, subtract electricity c.add(k["d0"], "select", K.fe_in, indexers={"t": _t, "c": "_T"}, drop=True) c.add(k["d1"], "select", K.fe_in, indexers={"t": _t, "c": "electr"}, drop=True) c.add(k["denom"], "sub", k["d0"], k["d1"]) # Compute ratio → implied emission factor # NB This contains y labels prior to 2020, but the other inputs to k.emi0[0], # below, do not, so these are effectively ignored. c.add(k["ratio"] / U, "div", k["num"] / U, k["denom"]) # Index to y=2020 values c.add(k["index"] / U, "index_to", k["ratio"] / U, quote({"y": 2020})) # Clip values to be <= 1 c.add(k / U, lambda q: q.clip(None, 1.0), k["index"] / U) return k / U
[docs] @cache def v_to_fe_coords(value: Hashable) -> Optional[dict[str, str]]: """Match ‘variable’ names codes using :data:`EXPR_FE`. For use with :func:`.select_expand`. """ if match := EXPR_FE.fullmatch(str(value)): return match.groupdict() else: return None
[docs] @cache def v_to_emi_coords(value: Hashable) -> Optional[dict[str, str]]: """Match ‘variable’ names codes using :data:`EXPR_EMI`. For use with :func:`.select_expand`. """ if match := EXPR_EMI.fullmatch(str(value)): result = match.groupdict() result["s"] = result["s"] or "_T" result["t"] = result["t"] or "_T" return result else: return None