"""Data for light-duty vehicles (LDVs) for passenger transport."""
import logging
from collections import defaultdict
from functools import lru_cache, partial
from operator import itemgetter, le
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, cast
import genno
import pandas as pd
from genno import Computer, quote
from genno.operator import load_file
from message_ix import make_df
from openpyxl import load_workbook
from sdmx.model.v21 import Code
from message_ix_models.model import disutility
from message_ix_models.model.structure import get_codes
from message_ix_models.util import (
ScenarioInfo,
adapt_R11_R12,
adapt_R11_R14,
broadcast,
cached,
check_support,
make_io,
make_matched_dfs,
merge_data,
minimum_version,
package_data_path,
same_node,
)
from message_ix_models.util.ixmp import rename_dims
from .emission import ef_for_input
from .operator import extend_y
from .util import input_commodity_level
if TYPE_CHECKING:
from genno.types import AnyQuantity
from .config import Config
log = logging.getLogger(__name__)
[docs]def prepare_computer(c: Computer):
"""Set up `c` to compute techno-economic data for light-duty-vehicle technologies.
Results in a key ``ldv::ixmp`` that triggers computation of :mod:`ixmp`-ready
parameter data for LDV technologies. These computations respond to
:attr:`.DataSourceConfig.LDV`:
- :obj:`None`: :func:`get_dummy` is used.
- “US-TIMES MA3T”: :func:`get_USTIMES_MA3T` is used.
In both cases, :func:`get_constraints` is used to generate constraints.
"""
from genno import Key
from . import factor
context = c.graph["context"]
config: "Config" = context.transport
source = config.data_source.LDV
info = config.base_model_info
# Add all the following computations, even if they will not be used
k1 = Key("US-TIMES MA3T")
c.add(k1 + "R11", read_USTIMES_MA3T_2, None, quote("R11"))
# Operator for adapting R11 data
adapt = {"R12": adapt_R11_R12, "R14": adapt_R11_R14}.get(context.model.regions)
if adapt:
c.add(k1 + "exo", adapt, k1 + "R11") # Adapt
else:
c.add(k1 + "exo", k1 + "R11") # Alias
# Extract the separate quantities
for name in TABLES:
k2 = Key(f"ldv {name}:n-t-y:exo")
c.add(k2, itemgetter(name), k1 + "exo")
# Insert a scaling factor that varies according to SSP
k_fe = Key("ldv fuel economy:n-t-y")
c.apply(factor.insert, k_fe + "exo", name=k_fe.name, target=k_fe, dims="nty")
# Reciprocal value, i.e. from Gv km / GW a → GW a / Gv km
k_eff = Key("ldv efficiency:t-y-n")
c.add(k_eff, "div", genno.Quantity(1.0), k_fe)
# Compute the input efficiency adjustment factor for the NAVIGATE project
# TODO Move this to project-specific code
k2 = Key("transport input factor:t-y")
c.add(k2, "factor_input", "y", "t::transport", "t::transport agg", "config")
# Product of NAVIGATE input factor and LDV efficiency
c.add(k_eff + "adj+0", "mul", k2, k_eff)
# Multiply by values from ldv-input-adj.csv. See file comment. Drop the 'scenario'
# dimension; there is only one value in the file per 'n'.
c.add(
"ldv input adj:n",
"sum",
"ldv input adj:n-scenario:exo",
dimensions=["scenario"],
)
c.add(k_eff + "adj", "mul", k_eff + "adj+0", "ldv input adj:n")
# Select a task for the final step that computes "ldv::ixmp"
final = {
"US-TIMES MA3T": (
get_USTIMES_MA3T,
"context",
k_eff + "adj",
"ldv inv_cost:n-t-y:exo",
"ldv fix_cost:n-t-y:exo",
),
None: (get_dummy, "context"),
}.get(source)
if final is None:
raise ValueError(f"invalid source for non-LDV data: {source}")
# Interpolate load factor
lf_nsy = Key("load factor ldv:scenario-n-y")
c.add(
lf_nsy + "0",
"interpolate",
lf_nsy + "exo",
"y::coords",
kwargs=dict(fill_value="extrapolate"),
)
# Select load factor
lf_ny = lf_nsy / "scenario"
c.add(lf_ny + "0", "select", lf_nsy + "0", "indexers:scenario")
# Insert a scaling factor that varies according to SSP
c.apply(factor.insert, lf_ny + "0", name="ldv load factor", target=lf_ny)
keys = [
c.add("ldv tech::ixmp", *final),
c.add(
"ldv usage::ixmp",
usage_data,
lf_ny,
"cg",
"n::ex world",
"t::transport LDV",
"y::model",
),
c.add("ldv constraints::ixmp", constraint_data, "context"),
c.add(
"ldv capacity_factor::ixmp",
capacity_factor,
"ldv activity:n:exo",
"t::transport LDV",
"y",
"broadcast:y-yv-ya",
),
]
# Add data from file ldv-new-capacity.csv
try:
k = Key(c.full_key("cap_new::ldv+exo"))
except KeyError:
pass # No such file in this configuration
else:
kw: Dict[str, Any] = dict(
dims=dict(node_loc="nl", technology="t", year_vtg="yv"), common={}
)
# historical_new_capacity: select only data prior to y₀
kw.update(name="historical_new_capacity")
y_historical = list(filter(lambda y: y < info.y0, info.set["year"]))
c.add(k + "1", "select", k, indexers=dict(yv=y_historical))
keys.append(c.add("ldv hnc::ixmp", "as_message_df", k + "1", **kw))
# bound_new_capacity_{lo,up}: select only data from y₀ and later
c.add(k + "2", "select", k, indexers=dict(yv=info.Y))
for s in "lo", "up":
kw.update(name=f"bound_new_capacity_{s}")
keys.append(c.add(f"ldv bnc_{s}::ixmp", "as_message_df", k + "2", **kw))
# TODO add bound_activity constraints for first year given technology shares
# TODO add historical_new_capacity for period prior to to first year
k_all = "transport ldv::ixmp"
c.add(k_all, "merge_data", *keys)
c.add("transport_data", __name__, key=k_all)
#: Input file containing structured data about LDV technologies.
#:
#: For R11, this data is from the US-TIMES and MA3T models.
FILE = "ldv-cost-efficiency.xlsx"
#: (parameter name, cell range, units) for data to be read from multiple sheets in the
#: :data:`FILE`.
TABLES = {
"fuel economy": (slice("B3", "Q15"), "Gv km / (GW year)"),
"inv_cost": (slice("B33", "Q45"), "USD / vehicle"),
"fix_cost": (slice("B62", "Q74"), "USD / vehicle"),
}
[docs]@cached
def read_USTIMES_MA3T(nodes: List[str], subdir=None) -> Mapping[str, "AnyQuantity"]:
"""Read the US-TIMES MA3T data from :data:`FILE`.
No transformation is performed.
**NB** this function takes only simple arguments (`nodes` and `subdir`) so that
:func:`.cached` computes the same key every time to avoid the slow step of opening/
reading the large spreadsheet. :func:`get_USTIMES_MA3T` then conforms the data to
particular context settings.
"""
# Open workbook
path = package_data_path("transport", subdir or "", FILE)
wb = load_workbook(path, read_only=True, data_only=True)
# Tables
data = defaultdict(list)
# Iterate over regions/nodes
for node in map(str, nodes):
# Worksheet for this region
sheet_node = node.split("_")[-1].lower()
sheet = wb[f"MESSAGE_LDV_{sheet_node}"]
# Read tables for efficiency, investment, and fixed O&M cost
# NB fix_cost varies by distance driven, thus this is the value for average
# driving.
# TODO calculate the values for modest and frequent driving
for par_name, (cells, _) in TABLES.items():
df = pd.DataFrame(list(sheet[cells])).map(lambda c: c.value)
# - Make the first row the headers.
# - Drop extra columns.
# - Use 'MESSAGE name' as the technology name.
# - Melt to long format.
# - Year as integer.
# - Assign "node" and "unit" columns.
# - Drop NA values (e.g. ICE_L_ptrp after the first year).
data[par_name].append(
df.iloc[1:, :]
.set_axis(df.loc[0, :], axis=1)
.drop(["Technology", "Description"], axis=1)
.rename(columns={"MESSAGE name": "t"})
.melt(id_vars=["t"], var_name="y")
.astype({"y": int})
.assign(n=node)
.dropna(subset=["value"])
)
# Combine data frames, convert to Quantity
qty = {}
for par_name, dfs in data.items():
qty[par_name] = genno.Quantity(
pd.concat(dfs, ignore_index=True).set_index(["n", "t", "y"]),
units=TABLES[par_name][1],
name=par_name,
)
return qty
[docs]def read_USTIMES_MA3T_2(nodes: Any, subdir=None) -> Dict[str, "AnyQuantity"]:
"""Same as :func:`read_USTIMES_MA3T`, but from CSV files."""
result = {}
for name in "fix_cost", "fuel economy", "inv_cost":
result[name] = load_file(
path=package_data_path(
"transport", subdir or "", f"ldv-{name.replace(' ', '-')}.csv"
),
dims=rename_dims(),
name=name,
).ffill("y")
return result
[docs]def get_USTIMES_MA3T(
context, efficiency: "AnyQuantity", inv_cost: "AnyQuantity", fix_cost: "AnyQuantity"
) -> Dict[str, pd.DataFrame]:
"""Prepare LDV data from US-TIMES and MA3T.
.. todo:: Some calculations are performed in the spreadsheet; transfer to code.
.. todo:: Values for intermediate time periods e.g. 2025 are forward-filled from
the next earlier period, e.g. 2020; interpolate instead.
Returns
-------
dict of (str → pd.DataFrame)
Data for the ``input``, ``output``, ``capacity_factor, ``technical_lifetime``,
``inv_cost``, and ``fix_cost`` parameters.
"""
from message_ix_models.util import convert_units
# Compatibility checks
check_support(
context,
settings=dict(regions=frozenset(["R11", "R12", "R14"])),
desc="US-TIMES and MA3T data available",
)
# Retrieve configuration and ScenarioInfo
config: "Config" = context.transport
technical_lifetime = config.ldv_lifetime["average"]
info = config.base_model_info
spec = config.spec
# Merge with base model commodity information for io_units() below
# TODO this duplicates code in .ikarus; move to a common location
all_info = ScenarioInfo()
all_info.set["commodity"].extend(get_codes("commodity"))
all_info.update(spec.add)
# Retrieve the input data
data = dict(efficiency=efficiency, inv_cost=inv_cost, fix_cost=fix_cost)
# Years to include
target_years = list(filter(partial(le, 2010), info.set["year"]))
# Extend over missing periods in the model horizon
data = {name: extend_y(qty, target_years) for name, qty in data.items()}
# Prepare "input" and "output" parameter data from `efficiency`
name = "efficiency"
base = data.pop(name).to_series().rename("value").reset_index()
common = dict(mode="all", time="year", time_dest="year", time_origin="year")
i_o = make_io(
src=(None, None, f"{efficiency.units:~}"),
dest=(None, "useful", "Gv km"),
efficiency=base["value"],
on="input",
node_loc=base["n"], # Other dimensions
technology=base["t"].astype(str),
year_vtg=base["y"],
**common,
)
# Assign input commodity and level according to the technology
result = {}
result["input"] = (
input_commodity_level(context, i_o["input"], default_level="final")
.pipe(broadcast, year_act=info.Y)
.query("year_act >= year_vtg")
.pipe(same_node)
)
# Convert units to the model's preferred input units for each commodity
@lru_cache
def _io_units(t, c, l): # noqa: E741
return all_info.io_units(t, c, l)
target_units = (
result["input"]
.apply(
lambda row: _io_units(row["technology"], row["commodity"], row["level"]),
axis=1,
)
.unique()
)
assert 1 == len(target_units)
result["input"]["value"] = convert_units(
result["input"]["value"],
{"value": (1.0, f"{efficiency.units:~}", target_units[0])},
)
# Assign output commodity based on the technology name
result["output"] = (
i_o["output"]
.assign(commodity=lambda df: "transport vehicle " + df["technology"])
.pipe(broadcast, year_act=info.Y)
.query("year_act >= year_vtg")
.pipe(same_node)
)
# Add technical lifetimes
result.update(
make_matched_dfs(base=result["output"], technical_lifetime=technical_lifetime)
)
# Transform costs
for name in "fix_cost", "inv_cost":
base = data[name].to_series().reset_index()
result[name] = make_df(
name,
node_loc=base["n"],
technology=base["t"],
year_vtg=base["y"],
value=base[name],
unit=f"{data[name].units:~}",
)
result["fix_cost"] = (
result["fix_cost"]
.pipe(broadcast, year_act=info.Y)
.query("year_act >= year_vtg")
)
# Compute CO₂ emissions factors
result.update(ef_for_input(context, result["input"], species="CO2"))
return result
[docs]def get_dummy(context) -> Dict[str, pd.DataFrame]:
"""Generate dummy, equal-cost output for each LDV technology."""
# Information about the target structure
config: "Config" = context.transport
info = config.base_model_info
# List of years to include
years = list(filter(lambda y: y >= 2010, info.set["year"]))
# List of LDV technologies
all_techs = config.spec.add.set["technology"]
ldv_techs = list(map(str, all_techs[all_techs.index("LDV")].child))
# 'output' parameter values: all 1.0 (ACT units == output units)
# - Broadcast across nodes.
# - Broadcast across LDV technologies.
# - Add commodity ID based on technology ID.
output = (
make_df(
"output",
value=1.0,
year_act=years,
year_vtg=years,
unit="Gv km",
level="useful",
mode="all",
time="year",
time_dest="year",
)
.pipe(broadcast, node_loc=info.N[1:], technology=ldv_techs)
.assign(commodity=lambda df: "transport vehicle " + df["technology"])
.pipe(same_node)
)
# Discard rows for the historical LDV technology beyond 2010
output = output[~output.eval("technology == 'ICE_L_ptrp' and year_vtg > 2010")]
# Add matching data for 'capacity_factor' and 'var_cost'
data = make_matched_dfs(output, capacity_factor=1.0, var_cost=1.0)
data["output"] = output
return data
[docs]@minimum_version("message_ix 3.6")
def capacity_factor(
qty: "AnyQuantity", t_ldv: dict, y, y_broadcast: "AnyQuantity"
) -> Dict[str, pd.DataFrame]:
"""Return capacity factor data for LDVs.
The data are:
- Broadcast across all |yV|, |yA| (`broadcast_y`), and LDV technologies (`t_ldv`).
- Converted to :mod:`message_ix` parameter format using :func:`.as_message_df`.
Parameters
----------
qty
Input data, for instance from file :`ldv-activity.csv`, with dimension |n|.
broadcast_y
The structure :py:`"broadcast:y-yv-va"`.
t_ldv
The structure :py:`"t::transport LDV"`, mapping the key "t" to the list of LDV
technologies.
y
All periods, including pre-model periods.
"""
from genno.operator import convert_units
try:
from message_ix.report.operator import as_message_df
except ImportError:
from message_ix.reporting.computations import as_message_df
# TODO determine units from technology annotations
data = convert_units(qty.expand_dims(y=y) * y_broadcast, "Mm / year")
name = "capacity_factor"
dims = dict(node_loc="n", year_vtg="yv", year_act="ya")
# TODO Remove typing exclusion once message_ix is updated for genno 1.25
result = as_message_df(data, name, dims, dict(time="year")) # type: ignore [arg-type]
result[name] = result[name].pipe(broadcast, technology=t_ldv["t"])
return result
[docs]def constraint_data(context) -> Dict[str, pd.DataFrame]:
"""Return constraints on light-duty vehicle technology activity and usage.
Responds to the :attr:`.Config.constraint` key :py:`"LDV growth_activity"`; see
description there.
"""
config: "Config" = context.transport
# Information about the target structure
info = config.base_model_info
years = info.Y[1:]
# Technologies as a hierarchical code list
techs = config.spec.add.set["technology"]
ldv_techs = techs[techs.index("LDV")].child
# All technologies in the spec, as strings
all_techs = list(map(str, techs))
# List of technologies to constrain, including the LDV technologies, plus the
# corresponding "X usage by CG" pseudo-technologies
constrained: List[Code] = []
for t in map(str, ldv_techs):
constrained.extend(filter(lambda _t: t in _t, all_techs)) # type: ignore
data: Dict[str, pd.DataFrame] = dict()
for bound in "lo", "up":
name = f"growth_activity_{bound}"
# Retrieve the constraint value from configuration
value = config.constraint[f"LDV {name}"]
# Assemble the data
data[name] = make_df(
name, value=value, year_act=years, time="year", unit="-"
).pipe(broadcast, node_loc=info.N[1:], technology=constrained)
# Prevent new capacity from being constructed for techs annotated
# "historical-only: True"
historical_only_techs = list(
filter(lambda t: t.eval_annotation("historical-only") is True, techs)
)
name = "bound_new_capacity_up"
data[name] = make_df(name, year_vtg=info.Y, value=0.0, unit="-").pipe(
broadcast, node_loc=info.N[1:], technology=historical_only_techs
)
return data
[docs]def usage_data(
load_factor: "AnyQuantity",
cg: List["Code"],
nodes: List[str],
t_ldv: Mapping[str, List],
years: List,
) -> Mapping[str, pd.DataFrame]:
"""Generate data for LDV usage technologies.
These technologies convert commodities like "transport ELC_100 vehicle" (i.e.
vehicle-distance traveled) into "transport pax RUEAM" (i.e. passenger-distance
traveled). These data incorporate:
1. Load factor, in the ``output`` efficiency.
2. Required consumption of a "disutility" commodity, in ``input``.
"""
from .structure import TEMPLATE
info = ScenarioInfo(set={"node": nodes, "year": years})
# Regenerate the Spec for the disutility formulation
spec = disutility.get_spec(
groups=cg,
technologies=t_ldv["t"],
template=TEMPLATE,
)
data = disutility.data_conversion(info, spec)
# Apply load factor
cols = list(data["output"].columns[:-2])
unit = data["output"]["unit"].unique()[0]
rename = cast(Mapping, {"n": "node_loc", "y": "year_act"})
data["output"] = (
(
genno.Quantity(data["output"].set_index(cols)["value"])
* load_factor.rename(rename)
)
.to_dataframe()
.reset_index()
.assign(unit=unit)
)
# Add a source that produces the "disutility" commodity
merge_data(data, disutility.data_source(info, spec))
return data