"""Data preparation for the MESSAGEix-GLOBIOM base model."""
from functools import partial
from itertools import pairwise
from typing import TYPE_CHECKING, Any, Union
import numpy as np
import pandas as pd
from genno import Computer, KeySeq
from genno.core.key import single_key
from message_ix_models.util import minimum_version
from .key import gdp_exo
if TYPE_CHECKING:
import genno
import message_ix
from genno.core.key import KeyLike
from genno.types import AnyQuantity
SCALE_1_HEADER = """Ratio of MESSAGEix-Transport output to IEA EWEB data.
- `t` (technology) codes correspond to IEA `FLOW` codes or equivalent aggregates
across groups of MESSAGEix-Transport technologies.
- `c` (commodity) codes correspond to MESSAGEix-GLOBIOM commodities or equivalent
aggregates across groups of IEA `PRODUCT` codes.
"""
SCALE_2_HEADER = """Ratio of scaled MESSAGEix-Transport output to IEA EWEB data.
The numerator used to compute this scaling factor is the one corrected by the values in
scale-1.csv.
"""
UE_SHARE_HEADER = (
"""Portion of useful energy output by each t within (nl, ya) groups."""
)
[docs]@minimum_version("pandas 2")
def smooth(c: Computer, key: "genno.Key", *, dim: str = "ya") -> "genno.Key":
"""Implement ‘smoothing’ for `key` along the dimension `dim`.
1. Identify values which do not meet a certain criterion. Currently the criterion
is: the first contiguous sequence of values that are lower than the corresponding
value in :math:`y_A = y_0` (e.g. in 2020).
2. Remove those values.
3. Fill by linear interpolation.
"""
from genno import Quantity
assert key.tag != "2"
ks = KeySeq(key.remove_tag(key.tag or ""))
def first_block_false(column: pd.Series) -> pd.Series:
"""Modify `column` to contain at most one contiguous block of :data:`.False`."""
# Iterate over values pairwise to identify start and end indices of a block
i_start = i_end = 0
for i, pair in enumerate(pairwise(column.values), start=1):
if pair == (True, False): # Start of a block of False values
i_start = i
elif pair == (False, True): # End of a block
i_end = i
break # Don't examine further values
if i_start > 0 and i_end > 0: # Block of False values with a start and end
column.iloc[i_end:] = True # Fill with True after the end of the block
elif i_start > 0: # Block of False values *without* end
column.iloc[i_start:] = True # Erase the entire block by overwriting
return column
def clip_nan(qty: "AnyQuantity", coord: Any) -> "AnyQuantity":
"""Clip values below the value for `ya`, replacing with :any:`numpy.nan`.
Only the first contiguous block of values below the value for `ya` are clipped.
"""
# Dimensions other than `dim`
others = list(qty.dims)
others.remove(dim)
# - Select the threshold values for `dim`=`coord`; broadcast to all `dim`.
# - Merge with `qty` values.
# - Reorder and sort index.
# - Compute condition for clipping.
# - Return clipped values.
return Quantity(
qty.sel({dim: coord})
.expand_dims({dim: qty.coords[dim]})
.to_series()
.rename("threshold")
.to_frame()
.merge(qty.to_series().rename("value"), left_index=True, right_index=True)
.reorder_levels(list(qty.dims))
.sort_index()
.where(
lambda df: (df.value >= df.threshold)
.groupby(others, group_keys=False)
.apply(first_block_false)
)["value"]
)
# Clip the data, removing values below the value for dim=y0
c.add(ks["_clip"], clip_nan, key, "y0")
# Identify the coordinates for interpolation on `dim`
c.add(ks["_ya coords"], lambda qty: {dim: sorted(qty.coords[dim].data)}, key)
# Interpolate to fill clipped data
c.add(
ks[2], "interpolate", ks["_clip"], ks["_ya coords"], method="slinear", sums=True
)
return ks[2]
[docs]def prepare_reporter(rep: "message_ix.Reporter") -> str:
"""Add tasks that produce data to parametrize transport in MESSAGEix-GLOBIOM.
Returns a key, "base model data". Retrieving the key results in the creation of 5
files in the reporting output directory for the :class:`.Scenario` being reported
(see :func:`.make_output_path`):
1. :file:`demand.csv`: This contains MESSAGEix-Transport model solution data
transformed into ``demand`` parameter data for a base MESSAGEix-GLOBIOM model—
that is, one without MESSAGEix-Transport. :file:`input-base.csv` is used.
2. :file:`bound_activity_lo.csv`: Same data transformed into ``bound_activity_lo``
parameter data for the transport technologies ("coal_trp", etc.) appearing in the
base model.
.. todo:: Drop ``bound_activity_lo`` values that are equal to zero.
3. :file:`bound_activity_up.csv`: Same values as (2), multiplied by 1.005.
Two files are for diagnosis:
4. :file:`scale-1.csv`: First stage scaling factor used to bring MESSAGEix-Transport
(c, t) totals in correspondence with IEA World Energy Balance (WEB) values.
5. :file:`scale-2.csv`: Second stage scaling factor used to bring overall totals.
"""
from genno import Key, KeySeq, Quantity, quote
# Final key
targets = []
def _to_csv(base: "Key", name: str, write_kw: Union[dict, "KeyLike", None] = None):
"""Helper to add computations to output data to CSV."""
# Some strings
csv, path, fn = f"{name} csv", f"{name} path", f"{name.replace(' ', '-')}.csv"
# Output path for this parameter
rep.add(path, "make_output_path", "config", name=fn)
# Write to file
rep.add(csv, "write_report", base, path, write_kw or {})
targets.append(csv)
# Keys for starting quantities
e_iea = Key("energy:n-y-product-flow:iea")
e_fnp = KeySeq(e_iea.drop("y"))
e_cnlt = Key("energy:c-nl-t:iea+0")
k = KeySeq("in:nl-t-ya-c-l-h:transport+units")
# First period
y0 = rep.get("y0")
# Transform IEA EWEB data for comparison
assert y0 == 2020, f"IEA Extended World Energy Balances: no data for y={y0}"
rep.add(e_fnp[0], "select", e_iea, indexers=dict(y=y0), drop=True)
rep.add(e_fnp[1], "aggregate", e_fnp[0], "groups::iea to transport", keep=False)
rep.add(
e_cnlt,
"rename_dims",
e_fnp[1],
quote(dict(flow="t", n="nl", product="c")),
sums=True,
)
# Transport outputs for comparison
rep.add(k[0], "select", k.base, indexers=dict(ya=y0), drop=True)
rep.add(k[1], "aggregate", k[0], "groups::transport to iea", keep=False)
# Scaling factor 1: ratio of MESSAGEix-Transport outputs to IEA data
tmp = rep.add("scale 1", "div", k[1], e_cnlt)
s1 = KeySeq(tmp)
rep.add(s1[1], "convert_units", s1.base, units="1 / a")
rep.add(s1[2], "mul", s1[1], Quantity(1.0, units="a"))
_to_csv(s1[2], s1.name, dict(header_comment=SCALE_1_HEADER))
# Replace ~0 and ∞ values with 1.0; this avoids x / 0 = inf
rep.add(s1[3], "where", s1[2], cond=lambda v: (v > 1e-3) & (v != np.inf), other=1.0)
# Restore original "t" labels to scale-1
rep.add(s1[4], "select", s1[3], "indexers::iea to transport")
rep.add(s1[5], "rename_dims", s1[4], quote(dict(t_new="t")))
# Interpolate the scaling factor from computed value in ya=y₀ to 1.0 in ya ≥ 2050
rep.add(s1[6], lambda q: q.expand_dims(ya=[y0]), s1[5])
rep.add(s1[7], lambda q: q.expand_dims(ya=[2050]).clip(1.0, 1.0), s1[5])
rep.add(s1[8], lambda q: q.expand_dims(ya=[2110]).clip(1.0, 1.0), s1[5])
rep.add(s1[9], "concat", s1[6], s1[7], s1[8])
rep.add("ya::coord", lambda v: {"ya": v}, "y::model")
rep.add(
s1[10], "interpolate", s1[9], "ya::coord", kwargs=dict(fill_value="extrapolate")
)
_to_csv(s1[10], f"{s1.name}-blend", dict(header_comment=SCALE_1_HEADER))
# Correct MESSAGEix-Transport outputs for the MESSAGEix-base model using the high-
# resolution scaling factor
rep.add(k["s1"], "div", k.base, s1[10])
# Scaling factor 2: ratio of total of scaled data to IEA total
rep.add(k[2] / "ya", "select", k["s1"], indexers=dict(ya=y0), drop=True, sums=True)
rep.add(
"energy:nl:iea+transport",
"select",
e_cnlt / "c",
indexers=dict(t="transport"),
drop=True,
)
tmp = rep.add("scale 2", "div", k[2] / ("c", "t", "ya"), "energy:nl:iea+transport")
s2 = KeySeq(tmp)
rep.add(s2[1], "convert_units", s2.base, units="1 / a")
rep.add(s2[2], "mul", s2[1], Quantity(1.0, units="a"))
_to_csv(s2[2], s2.name, dict(header_comment=SCALE_2_HEADER))
# Correct MESSAGEix-Transport outputs using the low-resolution scaling factor
rep.add(k["s2"], "div", k["s1"], s2[2])
# Compute for file and plot: transport final energy intensity of GDP PPP
k_gdp = rep.add("gdp:nl-ya", "rename_dims", gdp_exo, quote({"n": "nl", "y": "ya"}))
k_fei = single_key(rep.add("fe intensity", "div", k["s2"] / tuple("chlt"), k_gdp))
rep.add(k_fei + "units", "convert_units", k_fei, units="MJ / USD")
_to_csv(k_fei + "units", "fe intensity")
# Convert "final" energy inputs to transport to "useful energy" outputs, using
# efficiency data from input-base.csv (in turn, from the base model). This data
# will be used for `demand`.
# - Sum across the "t" dimension of `k` to avoid conflict with "t" labels introduced
# by the data from file.
tmp = rep.add("ue", "div", k["s2"] / "t", "input:t-c-h:base")
ue = KeySeq(tmp)
# Compute shares of useful energy by input
ue_share = rep.add(
"ue::share", "div", ue.base / tuple("chl"), ue.base / tuple("chlt")
)
assert isinstance(ue_share, Key)
# Minimum and maximum shares occurring over the model horizon in each region
rep.add(ue_share + "max", "max", ue_share, dim=("nl", "t"))
rep.add(ue_share + "min", "min", ue_share, dim=("nl", "t"))
_to_csv(ue_share, "ue share", dict(header_comment=UE_SHARE_HEADER))
_to_csv(
ue_share + "max",
"ue share max",
dict(header_comment=UE_SHARE_HEADER + "\n\nMaximum across ya."),
)
_to_csv(
ue_share + "min",
"ue share min",
dict(header_comment=UE_SHARE_HEADER + "\n\nMinimum across ya."),
)
# Ensure units: in::transport+units [=] GWa/a and input::base [=] GWa; their ratio
# gives units 1/a. The base model expects "GWa" for all 3 parameters.
rep.add(ue[1], "mul", ue.base, Quantity(1.0, units="GWa * a"))
_to_csv(ue[1] / ("c", "t"), "demand no fill", {})
# 'Smooth' ue[1] data by interpolating any dip below the base year value
assert rep.apply(smooth, ue[1] / ("c", "t")) == ue[2] / ("c", "t")
# Select only ya=y₀ data for use in `bound_activity_*`
b_a_l = rep.add(Key("b_a_l", ue[2].dims), "select", ue[1], quote(dict(ya=[y0])))
# `bound_activity_up` values are 1.005 * `bound_activity_lo` values
b_a_u = rep.add("b_a_u", "mul", b_a_l, Quantity(1.005))
# Keyword arguments for as_message_df()
args_demand = dict(
dims=dict(node="nl", year="ya", time="h"),
common=dict(commodity="transport", level="useful"),
)
args_bound_activity = dict(
dims=dict(node_loc="nl", technology="t", year_act="ya", time="h"),
common=dict(mode="M1"),
)
# Add similar steps for each parameter
for name, base_key, args in (
("demand", ue[2] / ("c", "t"), args_demand),
("bound_activity_lo", b_a_l, args_bound_activity),
("bound_activity_lo-projected", ue[1], args_bound_activity),
("bound_activity_up", b_a_u, args_bound_activity),
):
# More identifiers
s = f"base model transport {name}"
key, header = Key(f"{s}::ixmp"), f"{s} header"
# Convert to MESSAGE data structure
rep.add(
key, "as_message_df", base_key, name=name.split("-")[0], wrap=False, **args
)
# Sort values
# TODO Move upstream as a feature of as_message_df()
dims = list(args["dims"])
rep.add(key + "1", partial(pd.DataFrame.sort_values, by=dims), key)
# Header for this file
rep.add(header, "base_model_data_header", "scenario", name=name)
_to_csv(key + "1", name, header)
# Key to trigger all the above
result = "base model data"
rep.add(result, targets)
return result