Source code for message_ix_models.model.transport.ldv

"""Data for light-duty vehicles (LDVs) for passenger transport."""

import logging
from collections.abc import Mapping
from operator import itemgetter
from typing import TYPE_CHECKING, Any, cast

import genno
import pandas as pd
from genno import Computer, Key, Keys
from genno.core.key import single_key
from message_ix import make_df
from sdmx.model.common import Code

from message_ix_models.model import disutility
from message_ix_models.tools import exo_data
from message_ix_models.util import (
    ScenarioInfo,
    broadcast,
    convert_units,
    make_matched_dfs,
    merge_data,
    same_node,
)
from message_ix_models.util.genno import Collector

from .data import MaybeAdaptR11Source
from .emission import ef_for_input
from .key import activity_ldv_full, bcast_tcl, bcast_y, exo
from .util import EXTRAPOLATE, wildcard

if TYPE_CHECKING:
    from genno.types import AnyQuantity

    from message_ix_models.types import ParameterData

    from .config import Config

log = logging.getLogger(__name__)

#: Shorthand for tags on keys.
Li = "::LDV+ixmp"

#: Mapping from short dimension IDs to MESSAGE index names.
DIMS = dict(
    commodity="c",
    level="l",
    node_dest="n",
    node_loc="n",
    node_origin="n",
    technology="t",
    year_act="ya",
    year_vtg="yv",
)

#: Common, fixed values for :func:`.prepare_tech_econ` and :func:`.get_dummy`.
COMMON = dict(mode="all", time="year", time_dest="year", time_origin="year")

#: Target key that collects all data generated in this module.
TARGET = "transport::LDV+ixmp"


[docs] @exo_data.register_source class LDV(MaybeAdaptR11Source): """Provider of exogenous data on LDVs. Parameters ---------- source_kw : Must include exactly the keys "measure" (must be one of "fuel economy", "fix_cost", or "inv_cost"), "nodes", and "scenario". """ id = __name__ measures = {"inv_cost", "fuel economy", "fix_cost"} #: Names of expected files given :attr:`measure`. filename = { "inv_cost": "ldv-inv_cost.csv", "fuel economy": "ldv-fuel-economy.csv", "fix_cost": "ldv-fix_cost.csv", }
[docs] def __init__(self, source, source_kw) -> None: super().__init__(source, source_kw) # Use "exo" tag on the target key, to align with existing code in this module self.key = Key(f"{self.measure}:n-t-y:LDV+exo")
collect = Collector(TARGET, "{}::LDV+ixmp".format)
[docs] def prepare_computer(c: Computer): """Set up `c` to compute parameter data for light-duty-vehicle technologies. Results in a key :data:`TARGET` that triggers computation of :mod:`ixmp`-ready parameter data for LDV technologies. These computations respond to, *inter alia*, :attr:`.transport.Config.dummy_LDV`: - :any:`True`: :func:`get_dummy` is used. - :any:`False`: :func:`prepare_tech_econ` is used. In both cases, :func:`constraint_data` is used to generate constraint data. """ from genno import Key from . import factor collect.computer = c context = c.graph["context"] config: "Config" = context.transport info = config.base_model_info # Some keys/shorthand k = Keys( fe=Key("fuel economy:n-t-y:LDV"), eff=Key("efficiency:t-y-n:LDV"), factor_input=Key("input:t-y:LDV+factor"), ) t_ldv = "t::transport LDV" # Use .tools.exo_data.prepare_computer() to add tasks that load, adapt, and select # the appropriate data kw0 = dict(nodes=context.model.regions, scenario=str(config.ssp)) for kw0["measure"] in LDV.measures: exo_data.prepare_computer( context, c, source=__name__, source_kw=kw0, strict=False ) # Insert a scaling factor that varies according to SSP c.apply( factor.insert, k.fe + "exo", name="ldv fuel economy", target=k.fe, dims="nty" ) # Reciprocal value, i.e. from Gv km / GW a → GW a / Gv km c.add(k.eff[0], "div", genno.Quantity(1.0), k.fe) # Compute the input efficiency adjustment factor for the NAVIGATE project # TODO Move this to project-specific code c.add( k.factor_input, "factor_input", "y", "t::transport", "t::transport agg", "config", ) # Product of NAVIGATE input efficiency factor and LDV efficiency c.add(k.eff[1], "mul", k.factor_input, k.eff[0]) # Multiply by values from ldv-input-adj.csv. See file comment. Drop the 'scenario' # dimension; there is only one value in the file per 'n'. c.add("input:n:LDV+adj", "sum", exo.input_adj_ldv, dimensions=["scenario"]) c.add(k.eff[2], "mul", k.eff[1], "input:n:LDV+adj") ### Load factor # Interpolate on "y" dimension k.lf_nsy = Key(exo.load_factor_ldv) c.add(k.lf_nsy[0], "interpolate", k.lf_nsy, "y::coords", **EXTRAPOLATE) # Select load factor k.lf_ny = k.lf_nsy / "scenario" c.add(k.lf_ny[0], "select", k.lf_nsy[0], "indexers:scenario:LED") # Insert a scaling factor that varies according to SSP c.apply(factor.insert, k.lf_ny[0], name="ldv load factor", target=k.lf_ny) # Apply the function usage_data() for further processing collect("usage", usage_data, k.lf_ny, "cg", "n::ex world", t_ldv, "y::model") ### Technical lifetime tl, k_tl = "technical_lifetime", exo.lifetime_ldv # Interpolate on "yv" dimension c.add(k_tl[0], "interpolate", k_tl, "yv::coords", **EXTRAPOLATE) # Broadcast to all nodes, scenarios, and LDV technologies coords = ["scenario::all", "n::ex world", "t::LDV"] c.add(k_tl[1], "broadcast_wildcard", k_tl[0], *coords, dim=("scenario", "nl", "t")) # Select values for the current scenario c.add(k_tl[2] / "scenario", "select", k_tl[1], "indexers:scenario:LED") # Convert to integer # NB This is required because the MESSAGEix GAMS implementation cannot handle non- # integer values c.add(k_tl[3] / "scenario", lambda qty: qty.astype(int), k_tl[2] / "scenario") # Convert to MESSAGE data structure dims = dict(node_loc="nl", technology="t", year_vtg="yv") collect(tl, "as_message_df", k_tl[3] / "scenario", name=tl, dims=dims, common={}) ### Capacity factor cf, k_cf_s = "capacity_factor", exo.activity_ldv k_cf = k_cf_s / "scenario" # Convert units c.add(k_cf_s[0], "convert_units", k_cf_s, units="Mm/year") # Broadcast to all scenarios c.add(k_cf_s[1], "broadcast_wildcard", k_cf_s[0], "scenario::all", dim="scenario") # Select values for the current scenario c.add(k_cf[2], "select", k_cf_s[1], "indexers:scenario:LED") # Interpolate on "y" dimension c.add(k_cf["full"], "interpolate", k_cf[2], "y::coords", **EXTRAPOLATE) assert k_cf["full"] == activity_ldv_full # Add dimension "t" indexing all LDV technologies prev = c.add(k_cf[4] * "t", "expand_dims", k_cf["full"], "t::transport LDV") # Broadcast y → (yV, yA) prev = c.add(k_cf[5], "mul", prev, bcast_y.all) # Convert to MESSAGE data structure collect(cf, "as_message_df", prev, name=cf, dims=DIMS, common=COMMON) # Add further keys for MESSAGE-structured data # Techno-economic attributes # Select a task for the final step that computes "tech::LDV+ixmp" if config.dummy_LDV: collect("tech", get_dummy, "context") else: c.apply( prepare_tech_econ, efficiency=k.eff[2], inv_cost=Key("inv_cost:n-t-y:LDV+exo"), fix_cost=Key("fix_cost:n-t-y:LDV+exo"), ) # Constraints collect("constraints", constraint_data, "context") # Calculate base-period CAP_NEW and historical_new_capacity (‘sales’) if config.ldv_stock_method == "A": # Data from file ldv-new-capacity.csv try: k.stock = Key(c.full_key("cap_new::ldv+exo")) except KeyError: k.stock = Key("") # No such file in this configuration elif config.ldv_stock_method == "B": k.stock = single_key(c.apply(stock)) if k.stock: # Convert units c.add(k.stock[0], "convert_units", k.stock, units="million * vehicle / year") # historical_new_capacity: select only data prior to y₀ kw1: dict[str, Any] = dict( common={}, dims=dict(node_loc="nl", technology="t", year_vtg="yv"), name="historical_new_capacity", ) y_historical = list(filter(lambda y: y < info.y0, info.set["year"])) c.add(k.stock[1], "select", k.stock[0], indexers=dict(yv=y_historical)) collect(kw1["name"], "as_message_df", k.stock[1], **kw1) # CAP_NEW/bound_new_capacity_{lo,up} # - Select only data from y₀ and later. # - Discard values for ICE_conv. # TODO Do not hard code this label; instead, identify the technology with the # largest share and avoid setting constraints on it. # - Add both upper and lower constraints to ensure the solution contains exactly # the given value. c.add(k.stock[2], "select", k.stock[0], indexers=dict(yv=info.Y)) indexers = dict(t=["ICE_conv"]) c.add(k.stock[3], "select", k.stock[2], indexers=indexers, inverse=True) for kw1["name"] in map("bound_new_capacity_{}".format, ("lo", "up")): collect(kw1["name"], "as_message_df", k.stock[3], **kw1) # Add the data to the target scenario c.add("transport_data", __name__, key=TARGET)
[docs] def prepare_tech_econ( c: Computer, *, efficiency: Key, inv_cost: Key, fix_cost: Key ) -> None: """Prepare `c` to calculate techno-economic parameters for LDVs. This prepares `k_target` to return a data structure with MESSAGE-ready data for the parameters ``input``, ``ouput``, ``fix_cost``, and ``inv_cost``. """ # Identify periods to include # FIXME Avoid hard-coding this period c.add("y::LDV", lambda y: list(filter(lambda x: 1995 <= x, y)), "y") # Create base quantity for "output" parameter k = output_base = Key("output:n-t-y:LDV+base") c.add(k[0], wildcard(1.0, "Gv km", k.dims)) # Broadcast over (n, t, y) dimensions coords = ["n::ex world", "t::LDV", "y::model"] c.add(k[1], "broadcast_wildcard", k[0], *coords, dim=k.dims) # Broadcast `exo.input_share` over (c, t) dimensions. This produces a large Quantity # with 1.0 everywhere except explicit entries in the input data file. # NB Order matters here k = exo.input_share coords = ["t::LDV", "c::transport+base", "y"] # NB include historical periods c.add(k[0], "broadcast_wildcard", k, *coords, dim=k.dims) # Multiply by `bcast_tcl.input` to keep only the entries that correspond to actual # input commodities of particular technologies. input_bcast = c.add("input broadcast::LDV", "mul", k[0], bcast_tcl.input) ### Convert input and output to MESSAGE data structure for par_name, base, bcast in ( ("input", efficiency, input_bcast), ("output", output_base[1], bcast_tcl.output), ): k = Key(par_name, base.dims, "LDV") # Extend data over missing periods in the model horizon c.add(k[0], "extend_y", base, "y::LDV") # Broadcast from (y) to (yv, ya) dims to produce the full quantity for # input/output efficiency prev = c.add(k[1], "mul", k[0], bcast, bcast_y.all) # Convert to ixmp/MESSAGEix-structured pd.DataFrame c.add(k[2], "as_message_df", prev, name=par_name, dims=DIMS, common=COMMON) # Convert to target units and append to `TARGET` collect(par_name, convert_units, k[2], "transport info") ### Transform costs kw = dict(fill_value="extrapolate") for name, base in (("fix_cost", fix_cost), ("inv_cost", inv_cost)): prev = c.add(f"{name}::LDV+0", "interpolate", base, "y::coords", kwargs=kw) prev = c.add(f"{name}::LDV+1", "mul", prev, bcast_y.all) collect(name, "as_message_df", prev, name=name, dims=DIMS, common=COMMON) ### Compute CO₂ emissions factors # Extract the 'input' data frame other = Key("other::LDV") c.add(other[0], itemgetter("input"), f"input{Li}") # Apply ef_for_input; append to `TARGET` collect("emission_factor", ef_for_input, "context", other[0], species="CO2")
[docs] def get_dummy(context) -> "ParameterData": """Generate dummy, equal-cost output for each LDV technology.""" # Information about the target structure config: "Config" = context.transport info = config.base_model_info # List of years to include years = list(filter(lambda y: y >= 2010, info.set["year"])) # List of LDV technologies all_techs = config.spec.add.set["technology"] ldv_techs = list(map(str, all_techs[all_techs.index("LDV")].child)) # 'output' parameter values: all 1.0 (ACT units == output units) # - Broadcast across nodes. # - Broadcast across LDV technologies. # - Add commodity ID based on technology ID. output = ( make_df( "output", value=1.0, year_act=years, year_vtg=years, unit="Gv km", level="useful", **COMMON, ) .pipe(broadcast, node_loc=info.N[1:], technology=ldv_techs) .assign(commodity=lambda df: "transport vehicle " + df["technology"]) .pipe(same_node) ) # Discard rows for the historical LDV technology beyond 2010 output = output[~output.eval("technology == 'ICE_L_ptrp' and year_vtg > 2010")] # Add matching data for 'capacity_factor' and 'var_cost' data = make_matched_dfs(output, capacity_factor=1.0, var_cost=1.0) data["output"] = output return data
[docs] def constraint_data(context) -> "ParameterData": """Return constraints on light-duty vehicle technology activity and usage. Responds to the :attr:`.Config.constraint` key :py:`"LDV growth_activity"`; see description there. """ config: "Config" = context.transport # Information about the target structure info = config.base_model_info years = info.Y[1:] # Technologies as a hierarchical code list techs = config.spec.add.set["technology"] ldv_techs = techs[techs.index("LDV")].child # All technologies in the spec, as strings all_techs = list(map(str, techs)) # List of technologies to constrain, including the LDV technologies, plus the # corresponding "X usage by CG" pseudo-technologies constrained: list[Code] = [] for t in map(str, ldv_techs): constrained.extend(filter(lambda _t: t in _t, all_techs)) # type: ignore data: dict[str, pd.DataFrame] = dict() for bound in "lo", "up": name = f"growth_activity_{bound}" # Retrieve the constraint value from configuration value = config.constraint[f"LDV {name}"] # Assemble the data data[name] = make_df( name, value=value, year_act=years, time="year", unit="-" ).pipe(broadcast, node_loc=info.N[1:], technology=constrained) if bound == "lo": continue # Add initial_activity_up values allowing usage to begin in any period name = f"initial_activity_{bound}" data[name] = make_df( name, value=1e6, year_act=years, time="year", unit="-" ).pipe(broadcast, node_loc=info.N[1:], technology=constrained) # Prevent new capacity from being constructed for techs annotated # "historical-only: True" historical_only_techs = list( filter(lambda t: t.eval_annotation("historical-only") is True, techs) ) name = "bound_new_capacity_up" data[name] = make_df(name, year_vtg=info.Y, value=0.0, unit="-").pipe( broadcast, node_loc=info.N[1:], technology=historical_only_techs ) return data
[docs] def stock(c: Computer) -> Key: """Prepare `c` to compute base-period stock and historical sales.""" from .key import ldv_ny k = Key("stock:n-y:LDV") # - Divide total LDV activity by (1) annual driving distance per vehicle and (2) # load factor (occupancy) to obtain implied stock. # - Correct units: "load factor ldv:n-y" is dimensionless, should be # passenger/vehicle # - Select only the base-period value. c.add(k[0], "div", ldv_ny + "total", activity_ldv_full) c.add(k[1], "div", k[0], "load factor ldv:n-y:exo") c.add(k[2], "div", k[1], genno.Quantity(1.0, units="passenger / vehicle")) c.add(k[3] / "y", "select", k[2], "y0::coord") # Multiply by exogenous technology shares to obtain stock with (n, t) dimensions c.add("stock:n-t:LDV", "mul", k[3] / "y", exo.t_share_ldv) # TODO Move the following 4 calls to .build.add_structure() or similar # Identify the subset of periods up to and including y0 c.add( "y::to y0", lambda periods, y0: dict(y=list(filter(lambda y: y <= y0, periods))), "y", "y0", ) # Convert duration_period to Quantity c.add("duration_period:y", "duration_period", "info") # Duration_period up to and including y0 c.add("duration_period:y:to y0", "select", "duration_period:y", "y::to y0") # Groups for aggregating annual to period data c.add("y::annual agg", "groups_y_annual", "duration_period:y") # Fraction of sales in preceding years (annual, not MESSAGE 'year' referring to # multi-year periods) c.add("sales fraction:n-t-y:LDV", "sales_fraction_annual", exo.age_ldv) # Absolute sales in preceding years c.add("sales:n-t-y:LDV+annual", "mul", "stock:n-t:LDV", "sales fraction:n-t-y:LDV") # Aggregate to model periods; total sales across the period c.add( "sales:n-t-y:LDV+total", "aggregate", "sales:n-t-y:LDV+annual", "y::annual agg", keep=False, ) # Divide by duration_period for the equivalent of CAP_NEW/historical_new_capacity c.add("sales:n-t-y:LDV", "div", "sales:n-t-y:LDV+total", "duration_period:y") # Rename dimensions to match those expected in prepare_computer(), above k_result = Key("sales:nl-t-yv:LDV") c.add(k_result, "rename_dims", "sales:n-t-y:LDV", name_dict={"n": "nl", "y": "yv"}) return k_result
[docs] def usage_data( load_factor: "AnyQuantity", cg: list["Code"], nodes: list[str], t_ldv: Mapping[str, list], years: list, ) -> "ParameterData": """Generate data for LDV “usage pseudo-technologies”. These technologies convert commodities like "transport ELC_100 vehicle" (that is, vehicle-distance traveled) into "transport pax RUEAM" (that is, passenger-distance traveled). These data incorporate: 1. Load factor, in the ``output`` efficiency. 2. Required consumption of a "disutility" commodity, in ``input``. """ from .structure import TEMPLATE info = ScenarioInfo(set={"node": nodes, "year": years}) # Regenerate the Spec for the disutility formulation spec = disutility.get_spec(groups=cg, technologies=t_ldv["t"], template=TEMPLATE) data = disutility.data_conversion(info, spec) # Apply load factor cols = list(data["output"].columns[:-2]) unit = data["output"]["unit"].unique()[0] rename = cast(Mapping, {"n": "node_loc", "y": "year_act"}) data["output"] = ( ( genno.Quantity(data["output"].set_index(cols)["value"]) * load_factor.rename(rename) ) .to_dataframe() .reset_index() .assign(unit=unit) ) # Add a source that produces the "disutility" commodity merge_data(data, disutility.data_source(info, spec)) return data