"""Tools for scaling factors.
.. todo:: Add further :class:`.Layer` subclasses beyond :class:`.Constant` for, for
instance:
- Linear interpolation between given points.
- Exponentials, splines, and other functions.
"""
import logging
import operator
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)
import pandas as pd
from genno import Computer, Key, Quantity
from genno import operator as g
from message_ix_models.project.ssp import SSP_2024
if TYPE_CHECKING:
import genno.core.key
log = logging.getLogger(__name__)
[docs]class Layer(ABC):
"""Base class for layered assumptions used to build up a factor quantification."""
#: Operation for combining the quantification of this layer with the quantification
#: of the previous layer. Used by :meth:`apply`.
#:
#: Choices might include:
#:
#: - :func:`operator.mul`: take the product of this layer and previous layer's
#: values.
#: - :func:`.operator.pow`: raise the previous layer's values to the values from
#: this layer. This can be used with :py:`0` and :py:`1` to replace certain
#: values in `other` with 1.0, since :py:`other ** 0 = 1.0` and
#: :py:`other ** 1 = other`.
operation: Callable
def __hash__(self) -> int:
return hash(repr(self))
[docs] @abstractmethod
def quantify(self, coords: Mapping[str, Any]) -> Quantity:
"""Return a quantification of the layer.
The returned value:
- **May** have any of the dimensions indicated by `coords`.
- For such dimensions, **should** have some or all of the labels in `coords`.
Subclasses **must** implement this method.
"""
pass
[docs] def apply(self, other: Quantity, coords: Mapping[str, Any]) -> Quantity:
""":meth:`.quantify` this layer and combine it with `other`.
Subclasses **may** override this method. The default implementation combines
`other` with the results of :meth:`quantify` using :attr:`operation`.
"""
return self.operation(other, self.quantify(coords))
[docs]class Constant(Layer):
"""A value that is constant across 1 or more :attr:`dims`.
Parameters
----------
value :
If not :class:`.Quantity`, it is transformed to one.
dims :
Either a sequence of :class:`str`, or a single :class:`str` expression like
"x y z" or "x-y-z", which is split to a sequence.
"""
#: Fixed value.
value: Quantity
#: Dimensions of the result.
dims: Tuple[str, ...]
operation = operator.mul
#: Units.
units: str = "dimensionless"
[docs] def __init__(self, value: Union[float, Quantity], dims: Union[str, Sequence[str]]):
self.value = (
value if isinstance(value, Quantity) else Quantity(value, units=self.units)
)
self.dims = (
tuple(re.split("[ -]", dims)) if isinstance(dims, str) else tuple(dims)
)
[docs] def quantify(self, coords):
result = self.value
# FIXME genno cannot handle multiple dimensions simultaneously
for d in self.dims:
result = result.expand_dims(**{d: coords[d]})
return result
[docs]class Omit(Layer):
"""A layer that excludes 1 or more :attr:`labels` along :attr:`dim`.
Example
-------
>>> factor.Omit(x=["x1", "x3"])
"""
#: Dimension along which to omit :attr:`labels`.
dim: str
#: Specific labels or coords to omit.
labels: list
operation = operator.pow
[docs] def __init__(self, **kwargs):
assert 1 == len(kwargs)
for k, v in kwargs.items():
self.dim = k
self.labels = v
def _mask(self, v_in, v_out, other) -> pd.Series:
"""Return a ‘mask’ for use with :func:`operator.pow`.
The result has `v_in` where entries of `other` are in :attr:`labels`; otherwise
`v_out`.
"""
return pd.Series({x: (v_in if x in self.labels else v_out) for x in other})
[docs] def quantify(self, coords):
return Quantity(self._mask(0.0, 1.0, coords[self.dim]).rename_axis(self.dim))
[docs]class Keep(Omit):
"""A layer that preserves values for 1 or more :attr:`labels` along :attr:`dim`."""
[docs] def quantify(self, coords):
return Quantity(self._mask(1.0, 0.0, coords[self.dim]).rename_axis(self.dim))
[docs]@dataclass
class Map(Layer):
"""A layer that maps to different :attr:`values` along :attr:`dim`.
Parameters
----------
dim :
Dimension ID.
values : optional
Mapping from labels or coords along `dim` to other :class:`.Layers` which
produce the value(s) for those coords.
**value_kwargs :
Same as `values`, but as keyword arguments.
Example
-------
>>> layer = factor.Map(
... "new_dim",
... x=factor.Constant(2.0, "y z"),
... x=factor.Constant(3.0, "y z"),
... )
"""
dim: str
values: Dict[str, Layer]
operation = operator.mul
[docs] def __init__(
self, dim: str, values: Optional[Dict[str, Layer]] = None, **value_kwargs: Layer
):
self.dim = dim
self.values = values or value_kwargs
__hash__ = Layer.__hash__
[docs] def quantify(self, coords):
return g.concat(
*[
v.quantify(coords).expand_dims(**{self.dim: k})
for k, v in self.values.items()
]
)
[docs]class ScenarioSetting(Layer):
"""A layer that transforms a ‘scenario’ identifier to a particular ‘setting’.
This layer handles the common case that multiple ‘scenario’ identifiers may be
represented in a model using the same quantification. It uses coords along a
dimension named ‘setting’ to represent these distinct quantifications.
The :meth:`quantify` and :meth:`apply` methods have special behaviour
"""
#: Mapping from scenario identifier to setting label.
setting: Dict[Any, str]
#: Default setting.
default: str
operation = operator.mul
[docs] def __init__(self, setting: Optional[dict] = None, *, default=None, **setting_kw):
self.setting = setting or setting_kw
self.default = default
def __post_init__(self):
"""Check validity of the setting and values."""
labels0 = set(self.setting.values())
labels1 = set(self.value.keys())
if not labels0 <= labels1:
raise ValueError(
f"Setting labels {labels0} do not match value labels {labels1}"
)
[docs] @classmethod
def of_enum(cls, enum, data_expr: str, **kwargs):
"""Create from simpler data for an enumeration.
Parameters
----------
enum :
An enumeration, for instance :any:`.SSP_2024`.
data_expr : str
A string of the form "A=foo B=bar" indicating that the ‘setting’ for `enum`
member "A" is "foo", and so on.
**kwargs :
Passed to :class:`ScenarioSetting`.
"""
data = dict([item.split("=") for item in data_expr.split()])
# Look `enum` members corresponding to keys of `data`
setting = {enum[key]: value for key, value in data.items()}
if set(setting) != set(enum):
extra = set(enum) - set(setting)
missing = set(setting) - set(enum)
raise ValueError(
f"Scenario identifiers must match members of {enum}; "
f"missing {missing} and/or extra {extra}"
)
# Call the constructor
return cls(setting=setting, **kwargs)
[docs] def quantify(self, coords):
"""Return a quantification of the layer.
The key :py:`"scenario"` is **removed** from `coords`. (This means it is not
available to subsequent layers, and also not included in among the dimensions
of the :meth:`.Factor.quantify` result.)
The value returned is the value 1.0 with the single dimension ‘setting’ and
label obtained by passing the "scenario" coord through :attr:`setting`, or
using :attr:`default` if it does not appear.
"""
scenario = coords.pop("scenario")
try:
setting = self.setting[scenario]
except KeyError:
if not self.default:
print(self.setting)
raise
log.warning(
f"Use default setting {self.default!r} for unrecognized {scenario}"
)
setting = self.default
return Quantity(1.0).expand_dims(setting=setting)
[docs] def apply(self, other, coords):
""":meth:`.quantify` this layer and combine it with `other`.
This drops the ‘setting’ dimension from `other`.
"""
return super().apply(other, coords).drop_vars("setting")
[docs]@dataclass
class Factor:
"""Representation of assumptions used to construct a factor.
The assumptions are stored as a sequence of :attr:`layers`, and combined one by one
to produce a multi-dimensional :class:`.Quantity`.
Factor quantifications can be used in multiplicative, additive, or in other,
possibly more complicated ways.
"""
#: Ordered list of :class:`.Layer`.
layers: List[Layer] = field(default_factory=list)
def __hash__(self):
return hash(tuple(self.layers))
[docs] def quantify(self, **coords) -> Quantity:
"""Return a quantification.
The result will have **at least** the dimensions and labels in `coords`, and
**may** may have additional dimensions not from `coords`.
Parameters
----------
coords :
Target :mod:`xarray`-style coords: dimension IDs mapped to lists of labels.
"""
# Base result: quantify the first layer
result = self.layers[0].quantify(coords)
# Handle each layer in sequence
for layer in self.layers[1:]:
# Compute the quantification of the layer, apply to the previous result
result = layer.apply(result, coords)
# Ensure the result has complete dimensionality and scope
assert set(result.coords) >= set(coords), (result.coords, coords)
for k, v in coords.items():
assert set(v) == set(result.coords[k].data)
return result
# genno connection
[docs] def add_tasks(
self,
c: Computer,
key: "genno.core.key.KeyLike",
*inputs: "genno.Key",
scenario_expr: str,
) -> "genno.core.key.KeyLike":
"""Add a task to `c` to return the quantified Factor."""
dims = tuple(Key(k).name for k in inputs)
return c.add_single(
key,
partial(self, dims=dims, scenario_expr=scenario_expr),
"config",
*inputs,
)
def __call__(
self, config, *coords, dims: Tuple[str, ...], scenario_expr: str
) -> Quantity:
"""Invoke :meth:`quantify`, for use with :mod:`genno`."""
kw = dict(zip(dims, coords))
kw.update(scenario=eval(scenario_expr, dict(config=config)))
return self.quantify(**kw)
#: Common settings of ‘L’ow -20%, ‘M’edium = 0%, ‘H’igh = +20%.
LMH = Map(
"setting", L=Constant(0.8, "n y"), M=Constant(1.0, "n y"), H=Constant(1.2, "n y")
)
#: Exclude 2020 and 2025 from certain factors.
OMIT_2025 = Omit(y=[2020, 2025])
#: Common Factors for SSP quantification in transport.
COMMON = {
# “Share of Electric vehicles across SSPs”
# NB(PNK) The sign of these is reversed versus the spreadsheet, as indicated in the
# Slack thread: "high improvement" → "'L'ow investment cost".
# TODO Implement SSP4 as HIC=L, LIC=H (what about MIC?)
# TODO Interpret the note “?? / hybrid / e-fuel = We assume that 20% reduction in
# inv_cost for ICE_nga, ICAe_ffv, ICAm_ptrp, ICH_chyb, IGH_ghyb, IAHe_ptrp,
# IAHm_ptrp, HFC_ptrp over the relevant SSP trajectory” —under which conditions
# does this apply?
"ldv ev inv_cost": Factor(
[
LMH,
OMIT_2025,
Keep(t=["ELC_100", "PHEV_ptrp"]),
ScenarioSetting.of_enum(SSP_2024, "1=L 2=M 3=H 4=M 5=H", default="M"),
]
),
# “Occupancy of LDVs should differ across SSPs”
# TODO Implement SSP4 as "HIC: L, LIC: H"
"ldv load factor": Factor(
[
LMH,
OMIT_2025,
ScenarioSetting.of_enum(SSP_2024, "1=H 2=M 3=M 4=L 5=L", default="M"),
]
),
# “Difference in energy intensity of LDVs across SSPs”
# NB "High fuel economy" = "High efficiency" = *lower* input coefficient.
# TODO Implement SSP4 as “divergence (across and within)”
"ldv fuel economy": Factor(
[
LMH,
Constant(1.0, "t"),
OMIT_2025,
ScenarioSetting.of_enum(SSP_2024, "1=H 2=M 3=L 4=M 5=L", default="M"),
]
),
# “Mode share change by [AIR] across SSPs”
# NB To be applied as shift from (LDV + RAIL) modes to AIR mode.
"pdt air": Factor(
[LMH, ScenarioSetting.of_enum(SSP_2024, "1=L 2=M 3=H 4=M 5=H", default="M")]
),
# “Total pdt (active mode) differences across SSPs”
# This is implemented as the remaining share of PDT for "non-active" modes; that is,
# once active modes are subtracted.
#
# NB (PNK) Disabled to avoid decreases in PDT/capita from 2025→2030 and
# corresponding decrease in final energy input to transport. This should be
# replaced with a more complete representation of active mobility that starts in
# y₀ (or historical periods) and changes smoothly, rather than cutting in as of
# 2030.
#
# TODO Implement SSP4 as "HIC=H, MIC=M, LIC=M"
"pdt non-active": Factor(
[
Map(
"setting",
# H=Constant(0.9, "n y"),
# M=Constant(0.95, "n y"),
H=Constant(1.0, "n y"),
M=Constant(1.0, "n y"),
L=Constant(1.0, "n y"),
),
OMIT_2025,
ScenarioSetting.of_enum(SSP_2024, "1=H 2=M 3=L 4=M 5=L", default="M"),
]
),
# “Mode share of RAIL/PT across SSPs”
# NB(PNK) According to the Slack thread, the same factors are applied to shifts:
# - from LDV to RAIL (in the "urban public transport" *and* in "long-distance
# public modes")
# - from LDV to BUS.
# They are stored here as the same value, to be applied where indicated.
# TODO Interpret text: "SSP3=low/medium"
"pdt ldv → PT shift": Factor(
[LMH, ScenarioSetting.of_enum(SSP_2024, "1=H 2=M 3=L 4=M 5=L", default="M")]
),
# “Difference in electrification of RAIL across SSPs”
# NB(PNK) The sign of these are reversed versus the spread sheet: "high improvement
# [in cost]" → "Low cost".
# TODO Implement SSP4 as "HIC=L, MIC=H, LIC=H"
"rail inv_cost": Factor(
[
LMH,
Keep(t=["Hspeed_rai", "Mspeed_rai", "rail_pub"]),
ScenarioSetting.of_enum(SSP_2024, "1=L 2=M 3=H 4=M 5=H", default="M"),
]
),
}
[docs]def insert(c: Computer, key, *, name: str, target: Key, dims: str = "ny"):
"""Update `c` to apply the factor :py:`COMMON[name]` to `key`.
Use via :meth:`genno.Computer.apply`.
"""
k_target = Key(target)
dim_coord = {
"n": "n::ex world",
"t": "t::transport",
"y": "y::model",
}
coords = [dim_coord[d] for d in dims]
se = "config['transport'].ssp"
# Quantify the factor
c.add(k_target + "ssp factor", COMMON[name], *coords, scenario_expr=se)
# Multiply `key` by the factor
c.add(k_target, "mul", key, k_target + "ssp factor")
return k_target