"""Postprocess aviation emissions for SSP 2024."""
import logging
import re
from collections.abc import Hashable
from functools import cache
from typing import TYPE_CHECKING, Literal, Optional
import genno
import pandas as pd
from genno import Key
from genno.core.key import single_key
from message_ix_models import Context
from message_ix_models.model.structure import get_codelist
from message_ix_models.tools.iamc import iamc_like_data_for_query, to_quantity
from message_ix_models.util import minimum_version
if TYPE_CHECKING:
import pathlib
import sdmx.model.common
from genno import Computer
from genno.types import AnyQuantity, KeyLike, TQuantity
log = logging.getLogger(__name__)
#: Dimensions of several quantities.
DIMS = "e n t y UNIT".split()
EXPR_EMI = re.compile(
r"^Emissions\|(?P<e>[^\|]+)\|Energy\|Demand\|(?P<t>(Bunkers|Transportation).*)$"
)
EXPR_FE = re.compile(r"^Final Energy\|Transportation\|(?P<c>Liquids\|Oil)$")
#: :class:`.IEA_EWEB` flow codes used in the current file.
FLOWS = ["AVBUNK", "DOMESAIR", "TOTTRANS"]
#: Common label / :attr:`.Key.name`
L = "AIR emi"
[docs]
def aviation_share(ref: "TQuantity") -> "TQuantity":
"""Return (dummy) data for the share of aviation in emissions.
Currently this returns exactly the value `0.2`.
Parameters
----------
ref :
Reference quantity. The dimensions and coordinates :math:`(n, e, y)` of the
returned value exactly match `ref`.
Returns
-------
genno.Quantity
with dimensions :math:`(n, e, y)`.
"""
return (
genno.Quantity(0.2, units="dimensionless")
.expand_dims({"e": sorted(ref.coords["e"].data)})
.expand_dims({"n": sorted(ref.coords["n"].data)})
.expand_dims({"y": sorted(ref.coords["y"].data)})
)
[docs]
def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuantity":
"""Quantity to re-add the |t| dimension.
Parameters
----------
version :
Version of ‘variable’ names supported by the current module.
include_international :
If :any:`True`, include "Transportation|Aviation|International" with magnitude
1.0. Otherwise, omit.
Return
------
genno.Quantity
with dimension "t".
If :py:`version=1`, the values include:
- +1.0 for t="Transportation|Aviation", a label with missing data.
- -1.0 for t="Transportation|Road Rail and Domestic Shipping", a label with
existing data from which the aviation total must be subtracted.
If :py:`version=2`, the values include:
- +1.0 for t="Bunkers" and t="Bunkers|International Aviation", labels with zeros
in the input data file.
- -1.0 for t="Transportation" and t="Transportation|Road Rail and Domestic
Shipping", labels with existing data from which the aviation total must be
subtracted.
"""
if version == 1:
value = [1, -1, 1]
t = [
"Transportation|Aviation",
"Transportation|Road Rail and Domestic Shipping",
"Transportation|Aviation|International",
]
idx = slice(None) if include_international else slice(-1)
elif version == 2:
value = [1, 1, -1, -1]
t = [
"Bunkers",
"Bunkers|International Aviation",
"Transportation",
"Transportation|Road Rail and Domestic Shipping",
]
idx = slice(None)
return genno.Quantity(value[idx], coords={"t": t[idx]})
[docs]
def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity":
"""Return a quantity for broadcasting.
Returns
-------
genno.Quantity
with one value :math:`Q_{e, UNIT} = 1.0` for every label |e| in
`cl_emission`, with "UNIT" being the unit expression to be used with IAMC-
structured data.
Values are everywhere 1.0, except for species such as ``N2O`` that must be
reported in kt rather than Mt.
"""
data = []
for e in cl_emission:
try:
label = str(e.get_annotation(id="report").text)
except KeyError:
label = e.id
try:
unit = str(e.get_annotation(id="units").text)
except KeyError:
unit = "Mt"
data.append([e.id, f"{unit} {label}/yr", 1.0 if unit == "Mt" else 1e3])
dims = "e UNIT value".split()
return genno.Quantity(
pd.DataFrame(data, columns=dims).set_index(dims[:-1])[dims[-1]]
)
[docs]
def finalize(
q_all: "TQuantity", q_update: "TQuantity", model_name: str, scenario_name: str
) -> pd.DataFrame:
"""Finalize output.
1. Reattach "Model" and "Scenario" labels.
2. Reassemble the "Variable" dimension/coords of `q_update`; drop "e" and "t".
3. Convert both `q_all` and `q_update` to :class:`pandas.Series`; update the former
with the contents of the latter. This retains all other, unmodified data in
`q_all`.
4. Adjust to IAMC ‘wide’ structure.
Parameters
----------
q_all :
All data. Quantity with dimensions :math:`(n, y, UNIT, VARIABLE)`.
q_update :
Revised data to overwrite corresponding values in `q_all`. Quantity with
dimensions :data:`DIMS`.
"""
def _expand(qty):
return qty.expand_dims(
{"Model": [model_name], "Scenario": [scenario_name]}
).rename({"n": "Region", "UNIT": "Unit", "VARIABLE": "Variable"})
# Convert `q_all` to pd.Series
s_all = q_all.pipe(_expand).to_series()
# - Convert `q_update` to pd.Series
# - Reassemble "Variable" codes.
# - Drop dimensions (e, t).
# - Align index with s_all.
s_update = (
q_update.pipe(_expand)
.to_frame()
.reset_index()
.assign(
Variable=lambda df: "Emissions|" + df["e"] + "|Energy|Demand|" + df["t"]
)
.drop(["e", "t"], axis=1)
.set_index(s_all.index.names)[0]
.rename("value")
)
log.info(f"{len(s_update)} obs to update")
# Update `s_all`. This yields an 'outer join' of the original and s_update indices.
s_all.update(s_update)
return (
s_all.unstack("y")
.reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"])
.reset_index()
)
[docs]
@minimum_version("genno 1.28")
def prepare_computer(c: "Computer", k_input: Key, method: str) -> "KeyLike":
"""Prepare `c` to process aviation emissions data.
Returns
-------
str
"target". Calling :py:`c.get("target")` triggers the calculation.
"""
c.require_compat("message_ix_models.report.operator")
# Common structure and utility quantities used by prepare_method_[AB]
c.add(
f"broadcast:t:{L}", broadcast_t, version=2, include_international=method == "A"
)
k_emi_in = Key(L, DIMS, "input")
# Select and transform data matching EXPR_EMI
# Filter on "VARIABLE", expand the (e, t) dimensions from "VARIABLE"
c.add(k_emi_in[0], "select_expand", k_input, dim_cb={"VARIABLE": v_to_emi_coords})
c.add(k_emi_in[1], "assign_units", k_emi_in[0], units="Mt/year")
# Call a function to prepare the remaining calculations
prepare_func = {"A": prepare_method_A, "B": prepare_method_B}[method]
k = prepare_func(c, k_input, k_emi_in.last)
# This should return a key like "*:e-n-t-y-UNIT:*"; check
assert set(DIMS) == set(k.dims), k.dims
# Add to the input data
k_adj = c.add(Key(L, DIMS, "adj"), "add", k_emi_in.last, k)
# - Collapse to IAMC "VARIABLE" dimension name
# - Recombine with other data
c.add("target", finalize, k_input, k_adj, "model name", "scenario name")
return "target"
[docs]
def prepare_method_A(
c: "genno.Computer", k_input: "genno.Key", k_emi_in: "genno.Key"
) -> "genno.Key":
"""Prepare calculations using method 'A'.
1. Select data with variable names matching :data:`EXPR_EMI`.
2. Calculate (identical) values for:
- ``Emissions|*|Energy|Demand|Transportation|Aviation``
- ``Emissions|*|Energy|Demand|Transportation|Aviation|International``
…as the product of :func:`aviation_share` and
``Emissions|*|Energy|Demand|Transportation``.
3. Subtract (2) from:
``Emissions|*|Energy|Demand|Transportation|Road Rail and Domestic Shipping``
"""
# Shorthand
k = Key("result", DIMS)
# Select the total
c.add(k[0] / "t", "select", k_emi_in, indexers={"t": "_T"}, drop=True)
# Retrieve the aviation share of emissions
k_share = Key(f"{L} share", tuple("eny"))
c.add(k_share, aviation_share, k_emi_in)
# - (emission total) × (aviation share) → emissions of aviation
# - Re-add the "t" dimension with +ve sign for "Aviation" and -ve sign for "Road
# Rail and Domestic Shipping"
c.add(k[1], "mul", k_emi_in, k_share, f"broadcast:t:{L}")
return k[1]
[docs]
def prepare_method_B(
c: "genno.Computer", k_input: "genno.Key", k_emi_in: "genno.Key"
) -> "genno.Key":
"""Prepare calculations using method 'B'.
Excluding data transformations, units, and other manipulations for alignment:
1. From the :class:`.IEA_EWEB` 2024 edition, select data for :math:`y = 2019` and
the :data:`FLOWS`.
2. Aggregate IEA EWEB to align with MESSAGEix-GLOBIOM |c|.
3. Reverse the sign of values for flow=AVBUNK. These are negative in the source
data, but their absolute value must be added to values for flow=DOMESAIR.
4. Compute the ratio :math:`(AVBUNK + DOMESAIR) / TOTTRANS`, the share of aviation
in final energy.
5. From the input data (`k_input`), select the values matching :data:`EXPR_FE`, that
is, final energy use by aviation.
6. Load emissions intensity of aviation final energy use from the file
:ref:`transport-input-emi-intensity`.
7. Multiply (4) × (5) × (6) to compute the estimate of aviation emissions.
8. Estimate adjustments according to :func:`broadcast_t`.
9. Adjust `k_emi_in` by adding (7) and (8).
"""
from message_ix_models.model.transport import Config, build
from message_ix_models.model.transport.key import exo
from message_ix_models.tools.exo_data import prepare_computer
# Fetch a Context instance
# NB It is assumed this is aligned with the contents of the input data file
context = Context.get_instance()
# Add the same structure information used in the build and report workflow steps for
# MESSAGEix-Transport, notably <e::codelist> and <groups::iea to transport>
build.get_computer(context, c, options=dict(config=Config()))
# Shorthand for keys and sequences of keys
k_ei = exo.emi_intensity
k_fe_in = Key("fe", ("c", "n", "y", "UNIT"), "input")
k_cnt = Key("energy", ("c", "n", "t"), L)
k_cn = k_cnt / "t"
### Prepare data from IEA EWEB: the share of aviation in transport consumption of
### each 'c[ommodity]'
# Fetch data from IEA EWEB
kw = dict(provider="IEA", edition="2024", flow=FLOWS, transform="B", regions="R12")
k_iea = prepare_computer(context, c, "IEA_EWEB", kw, strict=False)[0]
k_fnp = Key(k_iea / "y") # flow, node, product
# Select data for 2019 only
c.add(k_fnp[0], "select", k_iea, indexers=dict(y=2019), drop=True)
# Only use the aggregation on the 'product' dimension, not on 'flow'
c.add(
"groups:p:iea to transport",
lambda d: {"product": d["product"]},
"groups::iea to transport",
)
# Aggregate IEA 'product' dimension for alignment to MESSAGE 'c[ommodity]'
c.add(k_fnp[1], "aggregate", k_fnp[0], "groups:p:iea to transport", keep=False)
# Rename dimensions
c.add(k_cnt[0], "rename_dims", k_fnp[1], name_dict=dict(flow="t", product="c"))
# Reverse sign of AVBUNK
q_sign = genno.Quantity([-1.0, 1.0, 1.0], coords={"t": FLOWS})
c.add(k_cnt[1], "mul", k_cnt[0], q_sign)
# Compute ratio of ('AVBUNK' + 'DOMESAIR') to 'TOTTRANS'
# TODO Confirm that this or another numerator is appropriate
c.add(k_cnt[2], "select", k_cnt[1], indexers=dict(t=["AVBUNK", "DOMESAIR"]))
c.add(k_cn[0], "sum", k_cnt[2], dimensions=["t"])
c.add(k_cn[1], "select", k_cnt[1], indexers=dict(t="TOTTRANS"), drop=True)
c.add(k_cn[2], "div", k_cn[0], k_cn[1])
### Prepare data from the input data file: total transport consumption of light oil
# Filter on "VARIABLE", extract (e) dimension
c.add(k_fe_in[0], "select_expand", k_input, dim_cb={"VARIABLE": v_to_fe_coords})
# Convert "UNIT" dim labels to Quantity.units
c.add(k_fe_in[1] / "UNIT", "unique_units_from_dim", k_fe_in[0], dim="UNIT")
# Relabel:
# - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'
# - n[ode]: 'AFR' → 'R12_AFR' etc.
labels = dict(
c={"Liquids|Oil": "lightoil"},
n={n.id.partition("_")[2]: n.id for n in get_codelist("node/R12")},
)
c.add(k_fe_in[2] / "UNIT", "relabel", k_fe_in[1] / "UNIT", labels=labels)
### Compute estimate of emissions
# Product of aviation share and FE of total transport → FE of aviation
k_ = c.add(f"{L} fe", "mul", k_fe_in.last / "UNIT", k_cn.last)
# Convert exogenous emission intensity data to Mt / EJ
c.add(k_ei + "conv", "convert_units", k_ei, units="Mt / EJ")
# - (FE of aviation) × (emission intensity) → emissions of aviation.
# - Drop/partial sum over 1 label ("AIR") on dimension "t".
_ = single_key(c.add(f"{L}::0", "mul", k_, k_ei + "conv", sums=True))
k_ = Key(_) / "t"
# Convert units to megatonne per year
k_ = c.add(Key(L, k_.dims, "1"), "convert_units", k_, units="Mt / year")
# - Add "UNIT" dimension and adjust magnitudes for species where units must be kt.
# See e_UNIT().
# - Re-add the "t" dimension with +ve sign for "Aviation" and -ve sign for "Road
# Rail and Domestic Shipping".
# - Drop/partial sum over dimension "c".
k_units = c.add(Key(f"{L} units:e-UNIT"), e_UNIT, "e::codelist")
_ = single_key(c.add(f"{L}::2", "mul", k_, k_units, f"broadcast:t:{L}"))
k_ = Key(_) / "c"
# Change labels: restore e.g. "AFR" given "R12_AFR"
labels = dict(n={v: k for k, v in labels["n"].items()})
k_result = single_key(c.add(Key(L, k_.dims, "3"), "relabel", k_, labels=labels))
return Key(k_result)
[docs]
def process_df(data: pd.DataFrame, *, method: str = "B") -> pd.DataFrame:
"""Process `data`.
Same as :func:`process_file`, except the data is returned as a data frame in the
same structure as `data`.
"""
c = genno.Computer()
# Peek at `data` to identify the model and scenario names
c.add("model name", genno.quote(data["Model"].iloc[0]))
c.add("scenario name", genno.quote(data["Scenario"].iloc[0]))
# Convert `data` to a Quantity with the appropriate structure
k_input = genno.Key("input", ("n", "y", "VARIABLE", "UNIT"))
c.add(
k_input,
to_quantity,
data,
non_iso_3166="keep",
query="Model != ''",
unique="MODEL SCENARIO",
)
# Prepare all other tasks
prepare_computer(c, k_input, method)
# Compute and return the result
return c.get("target")
[docs]
def process_file(
path_in: "pathlib.Path", path_out: "pathlib.Path", *, method: str
) -> None:
"""Process data from file.
1. Read input data from `path_in` in IAMC CSV format.
2. Call :func:`prepare_computer` and in turn either :func:`prepare_method_A` or
:func:`prepare_method_B` according to the value of `method`.
3. Write to `path_out` in the same format as (1).
Parameters
----------
path_in :
Input data path.
path_out :
Output data path.
method :
Either 'A' or 'B'.
"""
c = genno.Computer()
# Read the data from `path`
k_input = genno.Key("input", ("n", "y", "VARIABLE", "UNIT"))
c.add(
k_input,
iamc_like_data_for_query,
path=path_in,
non_iso_3166="keep",
query="Model != ''",
unique="MODEL SCENARIO",
)
# Peek at `path` to identify the model and scenario names
df = pd.read_csv(path_in, nrows=1)
c.add("model name", genno.quote(df["Model"].iloc[0]))
c.add("scenario name", genno.quote(df["Scenario"].iloc[0]))
prepare_computer(c, k_input, method)
# Execute, write the result back to file
c.get("target").to_csv(path_out, index=False)
[docs]
@cache
def v_to_fe_coords(value: Hashable) -> Optional[dict[str, str]]:
"""Match ‘variable’ names used in :func:`main`."""
if match := EXPR_FE.fullmatch(str(value)):
return match.groupdict()
else:
return None
[docs]
@cache
def v_to_emi_coords(value: Hashable) -> Optional[dict[str, str]]:
"""Match ‘variable’ names used in :func:`main`."""
if match := EXPR_EMI.fullmatch(str(value)):
return match.groupdict()
else:
return None