Source code for message_ix_models.model.transport.ikarus

"""Prepare non-LDV data from the IKARUS model via :file:`GEAM_TRP_techinput.xlsx`."""

import logging
from collections import defaultdict
from functools import lru_cache, partial
from operator import le
from typing import TYPE_CHECKING, Dict

import pandas as pd
import xarray as xr
from genno import Computer, Key, KeySeq, Quantity, quote
from genno.core.key import single_key
from iam_units import registry
from message_ix import make_df
from openpyxl import load_workbook

from message_ix_models.model.structure import get_codes
from message_ix_models.util import (
    ScenarioInfo,
    broadcast,
    cached,
    convert_units,
    make_matched_dfs,
    nodes_ex_world,
    package_data_path,
    same_node,
    same_time,
    series_of_pint_quantity,
)

from .non_ldv import UNITS
from .util import input_commodity_level

if TYPE_CHECKING:
    from .config import Config

log = logging.getLogger(__name__)

#: Name of the input file.
#:
#: The input file uses the old, MESSAGE V names for parameters:
#:
#: - inv_cost = inv
#: - fix_cost = fom
#: - technical_lifetime = pll
#: - input (efficiency) = minp
#: - output (efficiency) = moutp
#: - capacity_factor = plf
FILE = "GEAM_TRP_techinput.xlsx"

#: Mapping from parameters to 3-tuples of units:
#:
#: 1. Factor for units appearing in the input file.
#: 2. Units appearing in the input file.
#: 3. Target units for MESSAGEix-GLOBIOM.
_UNITS = dict(
    # Appearing in input file
    inv_cost=(1.0e6, "EUR_2000 / vehicle", "MUSD_2005 / vehicle"),
    fix_cost=(1000.0, "EUR_2000 / vehicle / year", "MUSD_2005 / vehicle / year"),
    var_cost=(0.01, "EUR_2000 / kilometer", None),
    technical_lifetime=(1.0, "year", None),
    availability=(100, "kilometer / vehicle / year", None),
    # NB this is written as "GJ / km" in the file
    input=(0.01, "GJ / (vehicle kilometer)", None),
    output=(1.0, "", None),
    # Created below
    capacity_factor=(1.0, None, None),
)

#: Rows and columns appearing in each :data:`CELL_RANGE`.
_SHEET_INDEX = dict(
    index=[
        "inv_cost",
        "fix_cost",
        "var_cost",
        "technical_lifetime",
        "availability",
        "input",
        "output",
    ],
    columns=[2000, 2005, 2010, 2015, 2020, 2025, 2030],
)

#: For each technology (keys), values are 3-tuples giving:
#:
#: 1. source index entry in the extracted files.
#: 2. technology index entry in the extracted files.
#: 3. starting and final cells delimiting tables in :data:`FILE`.
SOURCE = {
    "rail_pub": ("IKARUS", "regional train electric efficient", "C103:I109"),
    "drail_pub": ("IKARUS", "commuter train diesel efficient", "C37:I43"),
    "dMspeed_rai": ("IKARUS", "intercity train diesel efficient", "C125:I131"),
    "Mspeed_rai": ("IKARUS", "intercity train electric efficient", "C147:I153"),
    "Hspeed_rai": ("IKARUS", "high speed train efficient", "C169:I175"),
    "con_ar": ("Krey/Linßen", "Airplane jet", "C179:I185"),
    # Same parametrization as 'con_ar' (per cell references in spreadsheet):
    "conm_ar": ("Krey/Linßen", "Airplane jet", "C179:I185"),
    "conE_ar": ("Krey/Linßen", "Airplane jet", "C179:I185"),
    "conh_ar": ("Krey/Linßen", "Airplane jet", "C179:I185"),
    "ICE_M_bus": ("Krey/Linßen", "Bus diesel", "C197:I203"),
    "ICE_H_bus": ("Krey/Linßen", "Bus diesel efficient", "C205:I211"),
    "ICG_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    # Same parametrization as 'ICG_bus'. Conversion factors will be applied.
    "ICAe_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    "ICH_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    "PHEV_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    "FC_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    # Both equivalent to 'FC_bus'
    "FCg_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    "FCm_bus": ("Krey/Linßen", "Bus CNG", "C213:I219"),
    "Trolley_bus": ("Krey/Linßen", "Bus electric", "C229:I235"),
}


[docs]def make_indexers(*args) -> Dict[str, xr.DataArray]: """Return indexers corresponding to `SOURCE`. These can be used for :mod:`xarray`-style advanced indexing to select from the data in the IKARUS CSV files using the dimensions (source, t) and yield a new dimension ``t_new``. """ t_new, source, t = zip(*[(k, v[0], v[1]) for k, v in SOURCE.items()]) return dict( source=xr.DataArray(list(source), coords={"t_new": list(t_new)}), t=xr.DataArray(list(t), coords={"t_new": list(t_new)}), )
[docs]def make_output(input_data: Dict[str, pd.DataFrame], techs) -> Dict[str, pd.DataFrame]: """Make ``output`` data corresponding to IKARUS ``input`` data.""" result = make_matched_dfs( input_data["input"], output=registry.Quantity(1.0, UNITS["output"]) ) @lru_cache def c_for(t: str) -> str: """Return e.g. "transport vehicle rail" for a specific rail technology `t`.""" return f"transport vehicle {techs[techs.index(t)].parent.id.lower()}" # - Set "commodity" and "level" labels. # - Set units. # - Fill "node_dest" and "time_dest". result["output"] = ( result["output"] .assign(commodity=lambda df: df["technology"].apply(c_for), level="useful") .pipe(same_node) .pipe(same_time) ) return result
[docs]@cached def read_ikarus_data(occupancy, k_output, k_inv_cost): """Read the IKARUS data from :data:`FILE`. No transformation is performed. **NB** this function takes only simple arguments so that :func:`.cached` computes the same key every time to avoid the slow step of opening/reading the spreadsheet. :func:`get_ikarus_data` then conforms the data to particular context settings. .. note:: superseded by the computations set up by :func:`prepare_computer`. """ # Open the input file using openpyxl wb = load_workbook( package_data_path("transport", FILE), read_only=True, data_only=True ) # Open the 'updateTRPdata' sheet sheet = wb["updateTRPdata"] # 'technology name' -> pd.DataFrame dfs = {} for tec, (*_, cell_range) in SOURCE.items(): # - Read values from table for one technology, e.g. "regional train electric # efficient" = rail_pub. # - Extract the value from each openpyxl cell object. # - Set all non numeric values to NaN. # - Transpose so that each variable is in one column. # - Convert from input units to desired units. df = ( pd.DataFrame(list(sheet[slice(*cell_range.split(":"))]), **_SHEET_INDEX) .applymap(lambda c: c.value) .apply(pd.to_numeric, errors="coerce") .transpose() .apply(convert_units, unit_info=UNITS, store="quantity") ) # Convert IKARUS data to MESSAGEix-scheme parameters # TODO handle "availability" to provide distance_nonldv # Output efficiency: occupancy multiplied by an efficiency factor from config # NB this no longer depends on the file contents, and could be moved out of this # function. output = registry.Quantity( occupancy[tec], "passenger / vehicle" ) * k_output.get(tec, 1.0) df["output"] = series_of_pint_quantity([output] * len(df.index), index=df.index) df["inv_cost"] *= k_inv_cost.get(tec, 1.0) # Include variable cost * availability in fix_cost df["fix_cost"] += df["availability"] * df["var_cost"] # Store dfs[tec] = df.drop(columns=["availability", "var_cost"]) # Finished reading IKARUS data from spreadsheet wb.close() # - Concatenate to pd.DataFrame with technology and param as columns. # - Reformat as a pd.Series with a 3-level index: year, technology, param return ( pd.concat(dfs, axis=1, names=["technology", "param"]) .rename_axis(index="year") .stack(["technology", "param"]) )
[docs]def prepare_computer(c: Computer): """Prepare `c` to perform model data preparation using IKARUS data.""" # TODO identify whether capacity_factor is needed c.configure(rename_dims={"source": "source"}) c.add_single("ikarus indexers", quote(make_indexers())) c.add_single("y::ikarus", lambda data: list(filter(partial(le, 2000), data)), "y") k_u = c.add("ikarus adjust units", Quantity(1.0, units="(vehicle year) ** -1")) # NB this (harmlessly) duplicates an addition in .ldv.prepare_computer() # TODO deduplicate k_fi = c.add( "factor_input", "transport input factor:t-y", "y", "t::transport", "t::transport agg", "config", ) parameters = ["fix_cost", "input", "inv_cost", "technical_lifetime", "var_cost"] # For as_message_df(), common mapping from message_ix dimension IDs to short IDs in # computed quantities dims_common = dict(commodity="c", node_loc="n", node_origin="n", technology="t") # For as_message_df(), fixed values for all data common = dict(mode="all", time="year", time_origin="year") # Create a chain of tasks for each parameter final = {} for name in ["availability"] + parameters: # Base key for computations related to parameter `name` ks = KeySeq(f"ikarus {name}:c-t-y") # Refer to data loaded from file # Extend over missing periods in the model horizon key = c.add( ks[0] * "source", "extend_y", ks.base * "source" + "exo", "y::ikarus", strict=True, ) if name in ("fix_cost", "inv_cost"): # Adjust for "availability". The IKARUS source gives these costs, and # simultaneously an "availability" in [length]. Implicitly, the costs are # those to construct/operate enough vehicles/infrastructure to provide that # amount of availability. E.g. a cost of 1000 EUR and availability of 10 km # give a cost of 100 EUR / km. key = c.add(ks[1], "div", key, Key("ikarus availability", "tyc", "0")) # Adjust units key = c.add(ks[2], "mul", key, k_u) # Select desired values c.add(ks[3], "select", key, "ikarus indexers") key = c.add(ks[4], "rename_dims", ks[3], quote({"t_new": "t"})) if name == "input": # Apply scenario-specific input efficiency factor key = single_key(c.add("nonldv efficiency::adj", "mul", k_fi, key)) # Drop existing "c" dimension key = single_key(c.add(key / "c", "drop_vars", key, quote("c"))) # Fill (c, l) dimensions based on t key = c.add(ks[5], "mul", key, "broadcast:t-c-l") elif name == "technical_lifetime": # Round up technical_lifetime values due to incompatibility in handling # non-integer values in the GAMS code key = c.add(ks[5], "round", key) # Broadcast across "n" dimension key = c.add(ks[6], "mul", key, "n:n:ex world") if name in ("fix_cost", "input", "var_cost"): # Broadcast across valid (yv, ya) pairs key = c.add(ks[7], "mul", key, "broadcast:y-yv-ya") # Convert to target units try: target_units = quote(UNITS[name]) except KeyError: # "availability" pass else: key = c.add(ks[8], "convert_units", key, target_units) # Mapping between short dimension IDs in the computed quantities and the # dimensions in the respective MESSAGE parameters dims = dims_common.copy() dims.update( { "fix_cost": dict(year_act="ya", year_vtg="yv"), "input": dict(year_act="ya", year_vtg="yv", level="l"), "var_cost": dict(year_act="ya", year_vtg="yv"), }.get(name, dict(year_vtg="y")) ) # Convert to message_ix-compatible data frames key = c.add( "as_message_df", f"transport nonldv {name}::ixmp", key, name, dims, common ) if name in parameters: # The "availability" task would error, since it is not a MESSAGE parameter final[name] = key # Derive "output" data from "input" key = "transport nonldv output::ixmp" final["output"] = c.add( key, make_output, "transport nonldv input::ixmp", "t::transport" ) # Merge all data together k_all = "transport nonldv::ixmp+ikarus" c.add(k_all, "merge_data", *final.values())
# NB we do *not* call c.add("transport_data", ...) here; that is done in # .non_ldv.prepare_computer() only if IKARUS is the selected data source for non-LDV # data. Other derived quantities (emissions factors) are also prepared there based # on these outputs.
[docs]def get_ikarus_data(context) -> Dict[str, pd.DataFrame]: """Prepare non-LDV data from :cite:`Martinsen2006`. The data is read from from ``GEAM_TRP_techinput.xlsx``, and the processed data is exported into ``non_LDV_techs_wrapped.csv``. .. note:: superseded by the computations set up by :func:`prepare_computer`. Parameters ---------- context : .Context Returns ------- data : dict of (str -> pandas.DataFrame) Keys are MESSAGE parameter names such as 'input', 'fix_cost'. Values are data frames ready for :meth:`~.Scenario.add_par`. Years in the data include the model horizon indicated by :attr:`.Config.base_model_info`, plus the additional year 2010. """ # Reference to the transport configuration config: "Config" = context.transport tech_info = config.spec.add.set["technology"] info = config.base_model_info # Merge with base model commodity information for io_units() below # TODO this duplicates code in .ldv; move to a common location all_info = ScenarioInfo() all_info.set["commodity"].extend(get_codes("commodity")) all_info.update(config.spec.add) # Retrieve the data from the spreadsheet. Use additional output efficiency and # investment cost factors for some bus technologies data = read_ikarus_data( occupancy=config.non_ldv_output, # type: ignore [attr-defined] k_output=config.efficiency["bus output"], k_inv_cost=config.cost["bus inv"], ) # Create data frames to add imported params to MESSAGEix # Vintage and active years from scenario info # Prepend years between 2010 and *firstmodelyear* so that values are saved missing_years = [x for x in info.set["year"] if (2010 <= x < info.y0)] vtg_years = missing_years + info.yv_ya["year_vtg"].tolist() act_years = missing_years + info.yv_ya["year_act"].tolist() # Default values to be used as args in make_df() defaults = dict( mode="all", year_act=act_years, year_vtg=vtg_years, time="year", time_origin="year", time_dest="year", ) # Dict of ('parameter name' -> [list of data frames]) dfs = defaultdict(list) # Iterate over each parameter and technology for (par, tec), group_data in data.groupby(["param", "technology"]): # Dict including the default values to be used as args in make_df() args = defaults.copy() args["technology"] = tec # Parameter-specific arguments/processing if par == "input": pass # Handled by input_commodity_level(), below elif par == "output": # Get the mode for a technology mode = tech_info[tech_info.index(tec)].parent.id args.update(dict(commodity=f"transport pax {mode.lower()}", level="useful")) # Units, as an abbreviated string _units = group_data.apply(lambda x: x.units).unique() assert len(_units) == 1, "Units must be unique per (tec, par)" units = _units[0] args["unit"] = f"{units:~}" # Create data frame with values from *args* df = make_df(par, **args) # Assign input commodity and level according to the technology if par == "input": df = input_commodity_level(context, df, default_level="final") # Copy data into the 'value' column, by vintage year for (year, *_), value in group_data.items(): df.loc[df["year_vtg"] == year, "value"] = value.magnitude # Drop duplicates. For parameters with 'year_vtg' but no 'year_act' dimension, # the same year_vtg appears multiple times because of the contents of *defaults* df.drop_duplicates(inplace=True) # Fill remaining values for the rest of vintage years with the last value # registered, in this case for 2030. df["value"] = df["value"].fillna(method="ffill") # Convert to the model's preferred input/output units for each commodity if par in ("input", "output"): target_units = df.apply( lambda row: all_info.io_units( row["technology"], row["commodity"], row["level"] ), axis=1, ).unique() assert 1 == len(target_units) else: target_units = [] if len(target_units): # FIXME improve convert_units() to handle more of these steps df["value"] = convert_units( df["value"], {"value": (1.0, units, target_units[0])} ) df["unit"] = f"{target_units[0]:~}" # Round up technical_lifetime values due to incompatibility in handling # non-integer values in the GAMS code if par == "technical_lifetime": df["value"] = df["value"].round() # Broadcast across all nodes dfs[par].append( df.pipe(broadcast, node_loc=nodes_ex_world(info.N)).pipe(same_node) ) # Concatenate data frames for each model parameter result = {par: pd.concat(list_of_df) for par, list_of_df in dfs.items()} # Capacity factors all 1.0 result.update(make_matched_dfs(result["output"], capacity_factor=1.0)) result["capacity_factor"]["unit"] = "" if context.get("debug", False): # Directory for debug output (if any) debug_dir = context.get_local_path("debug") # Ensure the directory debug_dir.mkdir(parents=True, exist_ok=True) for name, df in result.items(): target = debug_dir.joinpath(f"ikarus-{name}.csv") log.info(f"Dump data to {target}") df.to_csv(target, index=False) return result