Source code for message_ix_models.model.transport.ldv

"""Data for light-duty vehicles (LDVs) for passenger transport."""

import logging
from collections import defaultdict
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, cast

import genno
import pandas as pd
from genno import Computer, Key, KeySeq
from message_ix import make_df
from openpyxl import load_workbook
from sdmx.model.v21 import Code

from message_ix_models.model import disutility
from message_ix_models.model.structure import get_codes
from message_ix_models.tools import exo_data
from message_ix_models.util import (
    ScenarioInfo,
    broadcast,
    cached,
    check_support,
    make_io,
    make_matched_dfs,
    merge_data,
    minimum_version,
    package_data_path,
    same_node,
)

from . import files as exo
from .data import MaybeAdaptR11Source
from .emission import ef_for_input
from .operator import extend_y
from .util import input_commodity_level

if TYPE_CHECKING:
    from genno.types import AnyQuantity

    from .config import Config

log = logging.getLogger(__name__)


[docs]@exo_data.register_source class LDV(MaybeAdaptR11Source): """Provider of exogenous data on LDVs Parameters ---------- source_kw : Must include exactly the keys "measure" (must be one of "fuel economy", "fix_cost", or "inv_cost"), "nodes", and "scenario". """ id = __name__ measures = {"inv_cost", "fuel economy", "fix_cost"} filename = { "inv_cost": "ldv-inv_cost.csv", "fuel economy": "ldv-fuel-economy.csv", "fix_cost": "ldv-fix_cost.csv", }
[docs] def __init__(self, source, source_kw) -> None: super().__init__(source, source_kw) # Use "exo" tag on the target key, to align with existing code in this module self.key = Key(f"ldv {self.measure}:n-t-y:exo")
[docs]def prepare_computer(c: Computer): """Set up `c` to compute techno-economic data for light-duty-vehicle technologies. Results in a key ``ldv::ixmp`` that triggers computation of :mod:`ixmp`-ready parameter data for LDV technologies. These computations respond to :attr:`.DataSourceConfig.LDV`: - :obj:`None`: :func:`get_dummy` is used. - “US-TIMES MA3T”: :func:`get_USTIMES_MA3T` is used. In both cases, :func:`get_constraints` is used to generate constraints. """ from genno import Key from genno.core.attrseries import AttrSeries from . import factor context = c.graph["context"] config: "Config" = context.transport source = config.data_source.LDV info = config.base_model_info # Use .tools.exo_data.prepare_computer() to add task that load, adapt, and select # the appropriate data for measure in LDV.measures: exo_data.prepare_computer( context, c, source=__name__, source_kw=dict( measure=measure, nodes=context.model.regions, scenario=str(config.ssp), ), strict=False, ) # Insert a scaling factor that varies according to SSP k_fe = Key("ldv fuel economy:n-t-y") c.apply(factor.insert, k_fe + "exo", name=k_fe.name, target=k_fe, dims="nty") # Reciprocal value, i.e. from Gv km / GW a → GW a / Gv km k_eff = Key("ldv efficiency:t-y-n") c.add(k_eff, "div", genno.Quantity(1.0), k_fe) # Compute the input efficiency adjustment factor for the NAVIGATE project # TODO Move this to project-specific code k2 = Key("transport input factor:t-y") c.add(k2, "factor_input", "y", "t::transport", "t::transport agg", "config") # Product of NAVIGATE input factor and LDV efficiency c.add(k_eff + "adj+0", "mul", k2, k_eff) # Multiply by values from ldv-input-adj.csv. See file comment. Drop the 'scenario' # dimension; there is only one value in the file per 'n'. c.add("ldv input adj:n", "sum", exo.input_adj_ldv, dimensions=["scenario"]) c.add(k_eff + "adj", "mul", k_eff + "adj+0", "ldv input adj:n") # Select a task for the final step that computes "ldv::ixmp" final = { "US-TIMES MA3T": ( get_USTIMES_MA3T, "context", k_eff + "adj", "ldv inv_cost:n-t-y:exo", "ldv fix_cost:n-t-y:exo", ), None: (get_dummy, "context"), }.get(source) if final is None: raise ValueError(f"invalid source for non-LDV data: {source}") # Interpolate load factor lf_nsy = Key("load factor ldv:scenario-n-y") c.add( lf_nsy + "0", "interpolate", lf_nsy + "exo", "y::coords", kwargs=dict(fill_value="extrapolate"), ) # Select load factor lf_ny = lf_nsy / "scenario" c.add(lf_ny + "0", "select", lf_nsy + "0", "indexers:scenario") # Insert a scaling factor that varies according to SSP c.apply(factor.insert, lf_ny + "0", name="ldv load factor", target=lf_ny) # Keys to be included in combined data keys = [] t_ldv = "t::transport LDV" # Extend (forward fill) lifetime to cover all periods c.add(exo.lifetime_ldv + "0", "extend_y", exo.lifetime_ldv, "y", dim="yv") # Broadcast to all nodes c.add( "lifetime:nl-yv:ldv", "broadcast_n", exo.lifetime_ldv + "0", "n::ex world", dim="nl", ) # Broadcast to all LDV technologies # TODO Use a named operator like genno.operator.expand_dims, instead of the method # of the AttrSeries class c.add("lifetime:nl-t-yv:ldv", AttrSeries.expand_dims, "lifetime:nl-yv:ldv", t_ldv) # Convert to MESSAGE data structure keys.append(Key("technical_lifetime::ldv+ixmp")) c.add( keys[-1], "as_message_df", "lifetime:nl-t-yv:ldv", name=keys[-1].name, dims=dict(node_loc="nl", technology="t", year_vtg="yv"), common={}, ) # Add further keys for MESSAGE-structured data # Techno-economic attributes keys.append("ldv tech::ixmp") c.add(keys[-1], *final) # Usage keys.append("ldv usage::ixmp") c.add(keys[-1], usage_data, lf_ny, "cg", "n::ex world", t_ldv, "y::model") # Constraints keys.append("ldv constraints::ixmp") c.add(keys[-1], constraint_data, "context") # Capacity factor keys.append("ldv capacity_factor::ixmp") c.add(keys[-1], capacity_factor, exo.activity_ldv, t_ldv, "y", "broadcast:y-yv-ya") # Calculate base-period CAP_NEW and historical_new_capacity (‘sales’) if config.ldv_stock_method == "A": # Data from file ldv-new-capacity.csv try: k = Key(c.full_key("cap_new::ldv+exo")) except KeyError: pass # No such file in this configuration elif config.ldv_stock_method == "B": k = c.apply(stock) kw: Dict[str, Any] = dict( dims=dict(node_loc="nl", technology="t", year_vtg="yv"), common={} ) if k: # historical_new_capacity: select only data prior to y₀ kw.update(name="historical_new_capacity") y_historical = list(filter(lambda y: y < info.y0, info.set["year"])) c.add(k + "1", "select", k, indexers=dict(yv=y_historical)) keys.append(c.add("ldv hnc::ixmp", "as_message_df", k + "1", **kw)) # CAP_NEW/bound_new_capacity_{lo,up} # - Select only data from y₀ and later. # - Discard values for ICE_conv. # TODO Do not hard code this label; instead, identify the technology with the # largest share and avoid setting constraints on it. # - Add both upper and lower constraints to ensure the solution contains exactly # the given value. c.add(k + "2", "select", k, indexers=dict(yv=info.Y)) c.add(k + "3", "select", k + "2", indexers=dict(t=["ICE_conv"]), inverse=True) for s in ("lo", "up"): kw.update(name=f"bound_new_capacity_{s}") keys.append(c.add(f"ldv bnc_{s}::ixmp", "as_message_df", k + "3", **kw)) k_all = "transport ldv::ixmp" c.add(k_all, "merge_data", *keys) c.add("transport_data", __name__, key=k_all)
#: Input file containing structured data about LDV technologies. #: #: For R11, this data is from the US-TIMES and MA3T models. FILE = "ldv-cost-efficiency.xlsx" #: (parameter name, cell range, units) for data to be read from multiple sheets in the #: :data:`FILE`. TABLES = { "fuel economy": (slice("B3", "Q15"), "Gv km / (GW year)"), "inv_cost": (slice("B33", "Q45"), "USD / vehicle"), "fix_cost": (slice("B62", "Q74"), "USD / vehicle"), }
[docs]@cached def read_USTIMES_MA3T(nodes: List[str], subdir=None) -> Mapping[str, "AnyQuantity"]: """Read the US-TIMES MA3T data from :data:`FILE`. No transformation is performed. **NB** this function takes only simple arguments (`nodes` and `subdir`) so that :func:`.cached` computes the same key every time to avoid the slow step of opening/ reading the large spreadsheet. :func:`get_USTIMES_MA3T` then conforms the data to particular context settings. """ # Open workbook path = package_data_path("transport", subdir or "", FILE) wb = load_workbook(path, read_only=True, data_only=True) # Tables data = defaultdict(list) # Iterate over regions/nodes for node in map(str, nodes): # Worksheet for this region sheet_node = node.split("_")[-1].lower() sheet = wb[f"MESSAGE_LDV_{sheet_node}"] # Read tables for efficiency, investment, and fixed O&M cost # NB fix_cost varies by distance driven, thus this is the value for average # driving. # TODO calculate the values for modest and frequent driving for par_name, (cells, _) in TABLES.items(): df = pd.DataFrame(list(sheet[cells])).map(lambda c: c.value) # - Make the first row the headers. # - Drop extra columns. # - Use 'MESSAGE name' as the technology name. # - Melt to long format. # - Year as integer. # - Assign "node" and "unit" columns. # - Drop NA values (e.g. ICE_L_ptrp after the first year). data[par_name].append( df.iloc[1:, :] .set_axis(df.loc[0, :], axis=1) .drop(["Technology", "Description"], axis=1) .rename(columns={"MESSAGE name": "t"}) .melt(id_vars=["t"], var_name="y") .astype({"y": int}) .assign(n=node) .dropna(subset=["value"]) ) # Combine data frames, convert to Quantity qty = {} for par_name, dfs in data.items(): qty[par_name] = genno.Quantity( pd.concat(dfs, ignore_index=True).set_index(["n", "t", "y"]), units=TABLES[par_name][1], name=par_name, ) return qty
[docs]def get_USTIMES_MA3T( context, efficiency: "AnyQuantity", inv_cost: "AnyQuantity", fix_cost: "AnyQuantity" ) -> Dict[str, pd.DataFrame]: """Prepare LDV data from US-TIMES and MA3T. .. todo:: Some calculations are performed in the spreadsheet; transfer to code. .. todo:: Values for intermediate time periods e.g. 2025 are forward-filled from the next earlier period, e.g. 2020; interpolate instead. Returns ------- dict of (str → pd.DataFrame) Data for the ``input``, ``output``, ``capacity_factor, ``technical_lifetime``, ``inv_cost``, and ``fix_cost`` parameters. """ from message_ix_models.util import convert_units # Compatibility checks check_support( context, settings=dict(regions=frozenset(["R11", "R12", "R14"])), desc="US-TIMES and MA3T data available", ) # Retrieve configuration and ScenarioInfo config: "Config" = context.transport info = config.base_model_info spec = config.spec # Merge with base model commodity information for io_units() below # TODO this duplicates code in .ikarus; move to a common location all_info = ScenarioInfo() all_info.set["commodity"].extend(get_codes("commodity")) all_info.update(spec.add) # Retrieve the input data data = dict(efficiency=efficiency, inv_cost=inv_cost, fix_cost=fix_cost) # Years to include # FIXME Avoid hard-coding this period target_years = list(filter(lambda y: 1995 <= y, info.set["year"])) # Extend over missing periods in the model horizon data = {name: extend_y(qty, target_years) for name, qty in data.items()} # Prepare "input" and "output" parameter data from `efficiency` name = "efficiency" base = data.pop(name).to_series().rename("value").reset_index() common = dict(mode="all", time="year", time_dest="year", time_origin="year") i_o = make_io( src=(None, None, f"{efficiency.units:~}"), dest=(None, "useful", "Gv km"), efficiency=base["value"], on="input", node_loc=base["n"], # Other dimensions technology=base["t"].astype(str), year_vtg=base["y"], **common, ) # Assign input commodity and level according to the technology result = {} result["input"] = ( input_commodity_level(context, i_o["input"], default_level="final") .pipe(broadcast, year_act=info.Y) .query("year_act >= year_vtg") .pipe(same_node) ) # Convert units to the model's preferred input units for each commodity @lru_cache def _io_units(t, c, l): # noqa: E741 return all_info.io_units(t, c, l) target_units = ( result["input"] .apply( lambda row: _io_units(row["technology"], row["commodity"], row["level"]), axis=1, ) .unique() ) assert 1 == len(target_units) result["input"]["value"] = convert_units( result["input"]["value"], {"value": (1.0, f"{efficiency.units:~}", target_units[0])}, ) # Assign output commodity based on the technology name result["output"] = ( i_o["output"] .assign(commodity=lambda df: "transport vehicle " + df["technology"]) .pipe(broadcast, year_act=info.Y) .query("year_act >= year_vtg") .pipe(same_node) ) # Transform costs for name in "fix_cost", "inv_cost": base = data[name].to_series().reset_index() result[name] = make_df( name, node_loc=base["n"], technology=base["t"], year_vtg=base["y"], value=base[name], unit=f"{data[name].units:~}", ) result["fix_cost"] = ( result["fix_cost"] .pipe(broadcast, year_act=info.Y) .query("year_act >= year_vtg") ) # Compute CO₂ emissions factors result.update(ef_for_input(context, result["input"], species="CO2")) return result
[docs]def get_dummy(context) -> Dict[str, pd.DataFrame]: """Generate dummy, equal-cost output for each LDV technology.""" # Information about the target structure config: "Config" = context.transport info = config.base_model_info # List of years to include years = list(filter(lambda y: y >= 2010, info.set["year"])) # List of LDV technologies all_techs = config.spec.add.set["technology"] ldv_techs = list(map(str, all_techs[all_techs.index("LDV")].child)) # 'output' parameter values: all 1.0 (ACT units == output units) # - Broadcast across nodes. # - Broadcast across LDV technologies. # - Add commodity ID based on technology ID. output = ( make_df( "output", value=1.0, year_act=years, year_vtg=years, unit="Gv km", level="useful", mode="all", time="year", time_dest="year", ) .pipe(broadcast, node_loc=info.N[1:], technology=ldv_techs) .assign(commodity=lambda df: "transport vehicle " + df["technology"]) .pipe(same_node) ) # Discard rows for the historical LDV technology beyond 2010 output = output[~output.eval("technology == 'ICE_L_ptrp' and year_vtg > 2010")] # Add matching data for 'capacity_factor' and 'var_cost' data = make_matched_dfs(output, capacity_factor=1.0, var_cost=1.0) data["output"] = output return data
[docs]@minimum_version("message_ix 3.6") def capacity_factor( qty: "AnyQuantity", t_ldv: dict, y, y_broadcast: "AnyQuantity" ) -> Dict[str, pd.DataFrame]: """Return capacity factor data for LDVs. The data are: - Broadcast across all |yV|, |yA| (`broadcast_y`), and LDV technologies (`t_ldv`). - Converted to :mod:`message_ix` parameter format using :func:`.as_message_df`. Parameters ---------- qty Input data, for instance from file :`ldv-activity.csv`, with dimension |n|. broadcast_y The structure :py:`"broadcast:y-yv-va"`. t_ldv The structure :py:`"t::transport LDV"`, mapping the key "t" to the list of LDV technologies. y All periods, including pre-model periods. """ from genno.operator import convert_units try: from message_ix.report.operator import as_message_df except ImportError: from message_ix.reporting.computations import as_message_df # TODO determine units from technology annotations data = convert_units(qty.expand_dims(y=y) * y_broadcast, "Mm / year") name = "capacity_factor" dims = dict(node_loc="n", year_vtg="yv", year_act="ya") # TODO Remove typing exclusion once message_ix is updated for genno 1.25 result = as_message_df(data, name, dims, dict(time="year")) # type: ignore [arg-type] result[name] = result[name].pipe(broadcast, technology=t_ldv["t"]) return result
[docs]def constraint_data(context) -> Dict[str, pd.DataFrame]: """Return constraints on light-duty vehicle technology activity and usage. Responds to the :attr:`.Config.constraint` key :py:`"LDV growth_activity"`; see description there. """ config: "Config" = context.transport # Information about the target structure info = config.base_model_info years = info.Y[1:] # Technologies as a hierarchical code list techs = config.spec.add.set["technology"] ldv_techs = techs[techs.index("LDV")].child # All technologies in the spec, as strings all_techs = list(map(str, techs)) # List of technologies to constrain, including the LDV technologies, plus the # corresponding "X usage by CG" pseudo-technologies constrained: List[Code] = [] for t in map(str, ldv_techs): constrained.extend(filter(lambda _t: t in _t, all_techs)) # type: ignore data: Dict[str, pd.DataFrame] = dict() for bound in "lo", "up": name = f"growth_activity_{bound}" # Retrieve the constraint value from configuration value = config.constraint[f"LDV {name}"] # Assemble the data data[name] = make_df( name, value=value, year_act=years, time="year", unit="-" ).pipe(broadcast, node_loc=info.N[1:], technology=constrained) if bound == "lo": continue # Add initial_activity_up values allowing usage to begin in any period name = f"initial_activity_{bound}" data[name] = make_df( name, value=1e6, year_act=years, time="year", unit="-" ).pipe(broadcast, node_loc=info.N[1:], technology=constrained) # Prevent new capacity from being constructed for techs annotated # "historical-only: True" historical_only_techs = list( filter(lambda t: t.eval_annotation("historical-only") is True, techs) ) name = "bound_new_capacity_up" data[name] = make_df(name, year_vtg=info.Y, value=0.0, unit="-").pipe( broadcast, node_loc=info.N[1:], technology=historical_only_techs ) return data
[docs]def stock(c: Computer) -> Key: """Prepare `c` to compute base-period stock and historical sales.""" from .key import ldv_ny k = KeySeq("stock:n-y:ldv") # - Divide total LDV activity by (1) annual driving distance per vehicle and (2) # load factor (occupancy) to obtain implied stock. # - Correct units: "load factor ldv:n-y" is dimensionless, should be # passenger/vehicle # - Select only the base-period value. c.add(k[0], "div", ldv_ny + "total", exo.activity_ldv) c.add(k[1], "div", k[0], "load factor ldv:n-y") c.add(k[2], "div", k[1], genno.Quantity(1.0, units="passenger / vehicle")) c.add(k[3] / "y", "select", k[2], "y0::coord") # Multiply by exogenous technology shares to obtain stock with (n, t) dimensions c.add("stock:n-t:ldv", "mul", k[3] / "y", exo.t_share_ldv) # TODO Move the following 4 calls to .build.add_structure() or similar # Identify the subset of periods up to and including y0 c.add( "y::to y0", lambda periods, y0: dict(y=list(filter(lambda y: y <= y0, periods))), "y", "y0", ) # Convert duration_period to Quantity c.add("duration_period:y", "duration_period", "info") # Duration_period up to and including y0 c.add("duration_period:y:to y0", "select", "duration_period:y", "y::to y0") # Groups for aggregating annual to period data c.add("y::annual agg", "groups_y_annual", "duration_period:y") # Fraction of sales in preceding years (annual, not MESSAGE 'year' referring to # multi-year periods) c.add("sales fraction:n-t-y:ldv", "sales_fraction_annual", exo.age_ldv) # Absolute sales in preceding years c.add("sales:n-t-y:ldv+annual", "mul", "stock:n-t:ldv", "sales fraction:n-t-y:ldv") # Aggregate to model periods; total sales across the period c.add( "sales:n-t-y:ldv+total", "aggregate", "sales:n-t-y:ldv+annual", "y::annual agg", keep=False, ) # Divide by duration_period for the equivalent of CAP_NEW/historical_new_capacity c.add("sales:n-t-y:ldv", "div", "sales:n-t-y:ldv+total", "duration_period:y") # Rename dimensions to match those expected in prepare_computer(), above k = Key("sales:nl-t-yv:ldv") c.add(k, "rename_dims", "sales:n-t-y:ldv", name_dict={"n": "nl", "y": "yv"}) return k
[docs]def usage_data( load_factor: "AnyQuantity", cg: List["Code"], nodes: List[str], t_ldv: Mapping[str, List], years: List, ) -> Mapping[str, pd.DataFrame]: """Generate data for LDV usage technologies. These technologies convert commodities like "transport ELC_100 vehicle" (i.e. vehicle-distance traveled) into "transport pax RUEAM" (i.e. passenger-distance traveled). These data incorporate: 1. Load factor, in the ``output`` efficiency. 2. Required consumption of a "disutility" commodity, in ``input``. """ from .structure import TEMPLATE info = ScenarioInfo(set={"node": nodes, "year": years}) # Regenerate the Spec for the disutility formulation spec = disutility.get_spec( groups=cg, technologies=t_ldv["t"], template=TEMPLATE, ) data = disutility.data_conversion(info, spec) # Apply load factor cols = list(data["output"].columns[:-2]) unit = data["output"]["unit"].unique()[0] rename = cast(Mapping, {"n": "node_loc", "y": "year_act"}) data["output"] = ( ( genno.Quantity(data["output"].set_index(cols)["value"]) * load_factor.rename(rename) ) .to_dataframe() .reset_index() .assign(unit=unit) ) # Add a source that produces the "disutility" commodity merge_data(data, disutility.data_source(info, spec)) return data