import logging
from import Mapping
from itertools import product

import numpy as np
import pandas as pd
from numpy.polynomial import Polynomial

from .config import Config
from .decay import project_ref_region_inv_costs_using_reduction_rates
from .gdp import adjust_cost_ratios_with_gdp
from .regional_differentiation import apply_regional_differentiation

log = logging.getLogger(__name__)

def smaller_than(sequence, value):
    return [item for item in sequence if item < value]

def larger_than(sequence, value):
    return [item for item in sequence if item > value]

def _maybe_query_scenario(df: pd.DataFrame, config: "Config") -> pd.DataFrame:
    """Filter `df` for :attr`.Config.scenario`, if any is specified."""
    if config.scenario == "all":
        scen = ["SSP1", "SSP2", "SSP3", "SSP4", "SSP5", "LED"]  # noqa: F841
        return df.query("scenario in @scen")
    elif config.scenario is not None:
        return df.query(f"scenario == {config.scenario.upper()!r}")
        return df

def _maybe_query_scenario_version(df: pd.DataFrame, config: "Config") -> pd.DataFrame:
    """Filter `df` for :attr`.Config.scenario_version`, if any is specified."""
    if config.scenario_version is None:
        return df

    # NB "all" does not appear in Config
    scen_vers = {  # noqa: F841
        "all": ["Review (2023)", "Previous (2013)"],
        "updated": ["Review (2023)"],
        "original": ["Previous (2013)"],

    return df.query("scenario_version in @scen_vers")

[docs] def create_projections_constant(config: "Config"): """Create cost projections using assuming constant regional cost ratios. Parameters ---------- config : .Config The function responds to, or passes on to other functions, the fields: :attr:`~.Config.base_year`, :attr:`~.Config.module`, :attr:`~.Config.node`, :attr:`~.Config.ref_region`, and :attr:`~.Config.scenario`. Returns ------- df_costs : pd.DataFrame Dataframe containing the cost projections with the columns: - scenario_version: scenario version (for constant method, only "Not applicable") - scenario: scenario name (SSP1, SSP2, SSP3, SSP4, SSP5, or LED) - message_technology: technology name - region: region name - year: year - inv_cost: investment cost - fix_cost: fixed operating and maintenance cost """"Selected scenario: {config.scenario}") "For the constant method, only the SSP scenario(s) itself needs to be " "specified. No scenario version (previous vs. updated) is needed." )"Calculate regional differentiation in base year+region") df_region_diff = apply_regional_differentiation(config)"Apply cost reduction rates rates to reference region") df_ref_reg_decay = project_ref_region_inv_costs_using_reduction_rates( df_region_diff, config ).pipe(_maybe_query_scenario, config) df_costs = ( df_region_diff.merge(df_ref_reg_decay, on="message_technology") .assign( inv_cost=lambda x: np.where( x.year <= config.base_year, x.reg_cost_base_year, x.inv_cost_ref_region_decay * x.reg_cost_ratio, ), fix_cost=lambda x: x.inv_cost * x.fix_ratio, scenario_version="Not applicable", ) .reindex( [ "scenario_version", "scenario", "message_technology", "first_technology_year", "region", "year", "inv_cost", "fix_cost", ], axis=1, ) .drop_duplicates() ) return df_costs
[docs] def create_projections_gdp(config: "Config"): """Create cost projections using the GDP method. Parameters ---------- config : .Config The function responds to, or passes on to other functions, the fields: :attr:`~.Config.base_year`, :attr:`~.Config.module`, :attr:`~.Config.node`, :attr:`~.Config.ref_region`, :attr:`~.Config.scenario`, and :attr:`~.Config.scenario_version`. Returns ------- df_costs : pd.DataFrame Dataframe containing the cost projections with the columns: - scenario_version: scenario version (for gdp method, either "Review (2023)" or "Previous (2013)" - scenario: scenario name (SSP1, SSP2, SSP3, SSP4, SSP5, or LED) - message_technology: technology name - region: region name - year: year - inv_cost: investment cost - fix_cost: fixed operating and maintenance cost """ # Print selection of scenario version and scenario"Selected scenario: {config.scenario}")"Selected scenario version: {config.scenario_version}")"Calculate regional differentiation in base year+region") df_region_diff = apply_regional_differentiation(config)"Apply cost reduction rates to reference region") df_ref_reg_reduction = project_ref_region_inv_costs_using_reduction_rates( df_region_diff, config ).pipe(_maybe_query_scenario, config)"Adjust ratios using GDP data") # - Compute adjustment # - Filter by Config.scenario, if given. # - Filter by Config.scenario_version, if given. df_adj_cost_ratios = ( adjust_cost_ratios_with_gdp(df_region_diff, config) .pipe(_maybe_query_scenario, config) .pipe(_maybe_query_scenario_version, config) ) df_costs = ( df_region_diff.merge(df_ref_reg_reduction, on="message_technology") .merge( df_adj_cost_ratios, on=["scenario", "message_technology", "region", "year"] ) .assign( inv_cost=lambda x: np.where( x.year <= config.base_year, x.reg_cost_base_year, x.inv_cost_ref_region_decay * x.reg_cost_ratio_adj, ), fix_cost=lambda x: x.inv_cost * x.fix_ratio, ) .reindex( [ "scenario_version", "scenario", "message_technology", "first_technology_year", "region", "year", "inv_cost", "fix_cost", ], axis=1, ) .drop_duplicates() ) return df_costs
[docs] def create_projections_converge(config: "Config"): """Create cost projections using the convergence method. Parameters ---------- config : .Config The function responds to, or passes on to other functions, the fields: :attr:`~.Config.base_year`, :attr:`~.Config.convergence_year`, :attr:`~.Config.module`, :attr:`~.Config.node`, :attr:`~.Config.ref_region`, and :attr:`~.Config.scenario`. Returns ------- df_costs : pd.DataFrame Dataframe containing the cost projections with the columns: - scenario_version: scenario version (for convergence method, only "Not applicable") - scenario: scenario name (SSP1, SSP2, SSP3, SSP4, SSP5, or LED) - message_technology: technology name - region: region name - year: year - inv_cost: investment cost - fix_cost: fixed operating and maintenance cost """"Selected scenario: {config.scenario}")"Selected convergence year: {config.convergence_year}") "For the convergence method, only the SSP scenario(s) itself needs to be " "specified. No scenario version (previous vs. updated) is needed." )"Calculate regional differentiation in base year+region") df_region_diff = apply_regional_differentiation(config)"Apply cost reduction rates to reference region") df_ref_reg_cost_reduction = project_ref_region_inv_costs_using_reduction_rates( df_region_diff, config ).pipe(_maybe_query_scenario, config) df_tmp_costs = ( df_region_diff.merge(df_ref_reg_cost_reduction, on="message_technology") .assign( inv_cost_tmp=lambda x: np.where( x.year <= config.base_year, x.reg_cost_base_year, np.where( x.year < config.convergence_year, x.inv_cost_ref_region_decay * x.reg_cost_ratio, x.inv_cost_ref_region_decay, ), ), ) .drop_duplicates() ) y_predict = np.array(config.seq_years) y_index = pd.Index(config.seq_years, name="year") def _predict(df: pd.DataFrame) -> pd.Series: """Fit a degree-3 polynomial to `df` and predict for :attr:`.seq_years`.""" # Fit p =, df.inv_cost_tmp, deg=1) # - Predict using config.seq_years. # - Assemble a single-column data frame with "year" as the index name. return pd.DataFrame({"inv_pre_converge_decay": p(y_predict)}, index=y_index) # Columns for grouping and merging cols = ["scenario", "message_technology", "region"] # Apply polynomial regression to costs at base year and convergence year # (interpolating) df_pre_converge_costs = ( df_tmp_costs.query( "year == @config.base_year or year == @config.convergence_year" ) .groupby(cols[:3], group_keys=True) .apply(_predict) .reset_index() ) # Get final investment costs df_inv_costs_final = ( df_tmp_costs.merge( df_pre_converge_costs, on=["scenario", "message_technology", "region", "year"], ) .assign( inv_cost_converge=lambda x: np.where( x.year <= config.base_year, x.reg_cost_base_year, np.where( x.region == config.ref_region, x.inv_cost_ref_region_decay, np.where( x.year < config.convergence_year, x.inv_pre_converge_decay, x.inv_cost_ref_region_decay, ), ), ), ) .drop_duplicates() ) # Get fixed O&M costs df_costs = ( df_inv_costs_final.rename(columns={"inv_cost_converge": "inv_cost"}) .assign( fix_cost=lambda x: x.inv_cost * x.fix_ratio, scenario_version="Not applicable", ) .reindex( [ "scenario_version", "scenario", "message_technology", "first_technology_year", "region", "year", "inv_cost", "fix_cost", ], axis=1, ) .drop_duplicates() ) return df_costs
[docs] def create_message_outputs( df_projections: pd.DataFrame, config: "Config" ) -> tuple[pd.DataFrame, pd.DataFrame]: """Create MESSAGEix outputs for investment and fixed costs. The returned data have the model periods given by :attr:`.Config.Y`. Parameters ---------- df_projections : pd.DataFrame Dataframe containing the cost projections for each technology. Output of func:`create_cost_projections`. config : .Config The function responds to the fields :attr:`~.Config.fom_rate` and :attr:`~.Config.Y`. Returns ------- inv: pd.DataFrame Dataframe containing investment costs. fom: pd.DataFrame Dataframe containing fixed operating and maintenance costs. """"Convert {fix,inv}_cost data to MESSAGE structure") y_base = config.base_year dims = [ "scenario_version", "scenario", "message_technology", "first_technology_year", "region", "year", ] df_prod = pd.DataFrame( product( df_projections.scenario_version.unique(), df_projections.scenario.unique(), df_projections.message_technology.unique(), df_projections.first_technology_year.unique(), df_projections.region.unique(), config.seq_years, ), columns=dims, ) val_2020 = ( df_projections.query("year == 2020") .rename(columns={"inv_cost": "inv_cost_2020", "fix_cost": "fix_cost_2020"}) .drop(columns=["year"]) ) val_2100 = ( df_projections.query("year == 2100") .drop(columns=["year"]) .rename(columns={"inv_cost": "inv_cost_2100", "fix_cost": "fix_cost_2100"}) ) df_merge = ( ( df_prod.merge(val_2020, on=dims[:-1]) .merge(val_2100, on=dims[:-1]) .merge(df_projections, on=dims, how="left") ) .assign( inv_cost=lambda x: np.where(x.year <= y_base, x.inv_cost_2020, x.inv_cost), fix_cost=lambda x: np.where(x.year <= y_base, x.fix_cost_2020, x.fix_cost), ) .assign( # NOTE: This portion carries over the 2100 values to years beyond 2100. # This is applicable in the case where Config.final_year > 2100. inv_cost=lambda x: np.where(x.year >= 2100, x.inv_cost_2100, x.inv_cost), fix_cost=lambda x: np.where(x.year >= 2100, x.fix_cost_2100, x.fix_cost), ) .drop( columns=["inv_cost_2020", "fix_cost_2020", "inv_cost_2100", "fix_cost_2100"] ) .rename(columns={"year": "year_vtg"}) .drop_duplicates() ) dtypes = dict( scenario_version=str, scenario=str, node_loc=str, technology=str, first_technology_year=str, unit=str, year_vtg=int, value=float, ) inv = ( df_merge.copy() .assign(unit="USD/kWa") .rename( columns={ "inv_cost": "value", "message_technology": "technology", "region": "node_loc", } ) .reindex( [ "scenario_version", "scenario", "node_loc", "technology", "first_technology_year", "year_vtg", "value", "unit", ], axis=1, ) .astype(dtypes) .query("year_vtg in @config.Y") .astype({"first_technology_year": float}) # has to be float; int gives error .query("year_vtg >= first_technology_year") .reset_index(drop=True) .drop_duplicates() .drop("first_technology_year", axis=1) ) dtypes.update(year_act=int) to_merge = pd.DataFrame( {"year_act" if config.use_vintages else "year_vtg": config.seq_years} ).assign(key=1) def _compute_value(df: pd.DataFrame) -> pd.Series: if not config.use_vintages: return df.fix_cost rate = 1.0 + config.fom_rate return np.where( df.year_vtg <= y_base, np.where( df.year_act <= y_base, df.fix_cost, # NB if fom_rate was 0, the latter terms collapse to 1.0 ** (…) = 1.0 df.fix_cost * rate ** (df.year_act - y_base), ), df.fix_cost * rate ** (df.year_act - df.year_vtg), ) fom = ( df_merge.copy() .drop(columns=["inv_cost"]) .rename(columns={"year_vtg": "year_vtg" if config.use_vintages else "year_act"}) .assign(key=1) .merge(to_merge, on="key") .drop(columns=["key"]) .query("year_act >= year_vtg") .assign(value=_compute_value, unit="USD/kWa") .rename(columns={"message_technology": "technology", "region": "node_loc"}) .reindex( [ "scenario_version", "scenario", "node_loc", "technology", "first_technology_year", "year_vtg", "year_act", "value", "unit", ], axis=1, ) .astype(dtypes) .query("year_act in @config.Y and year_vtg in @config.Y") .astype({"first_technology_year": float}) # has to be float; int gives error .query("year_vtg >= first_technology_year") .reset_index(drop=True) .drop_duplicates() .drop("first_technology_year", axis=1) ) return inv, fom
[docs] def create_iamc_outputs( msg_inv: pd.DataFrame, msg_fix: pd.DataFrame ) -> tuple[pd.DataFrame, pd.DataFrame]: """Create IAMC outputs for investment and fixed costs. Parameters ---------- msg_inv : pd.DataFrame Dataframe containing investment costs in MESSAGEix format. Output of func:`create_message_outputs`. msg_fix : pd.DataFrame Dataframe containing fixed operating and maintenance costs in MESSAGEix format. Output of func:`create_message_outputs`. Returns ------- iamc_inv : pd.DataFrame Dataframe containing investment costs in IAMC format. iamc_fix : pd.DataFrame Dataframe containing fixed operating and maintenance costs in IAMC format. """"Convert {fix,inv}_cost data to IAMC structure") iamc_inv = ( ( msg_inv.assign( Variable=lambda x: "Capital Cost|Electricity|" +, ) .rename( columns={ "scenario_version": "SSP_Scenario_Version", "scenario": "SSP_Scenario", "year_vtg": "Year", "node_loc": "Region", "unit": "Unit", } ) .drop(columns=["technology"]) ) .pivot_table( index=[ "SSP_Scenario_Version", "SSP_Scenario", "Region", "Variable", "Unit", ], columns="Year", values="value", ) .reset_index() .rename_axis(None, axis=1) .drop_duplicates() ) iamc_fix = ( ( msg_fix.assign( Variable=lambda x: "OM Cost|Electricity|" + + "|Vintage=" + x.year_vtg.astype(str), ) .rename( columns={ "scenario_version": "SSP_Scenario_Version", "scenario": "SSP_Scenario", "year_act": "Year", "node_loc": "Region", "unit": "Unit", } ) .drop(columns=["technology", "year_vtg"]) ) .pivot_table( index=[ "SSP_Scenario_Version", "SSP_Scenario", "Region", "Variable", "Unit", ], columns="Year", values="value", ) .reset_index() .rename_axis(None, axis=1) .drop_duplicates() ) return iamc_inv, iamc_fix
[docs] def create_cost_projections(config: "Config") -> Mapping[str, pd.DataFrame]: """Get investment and fixed cost projections. This is the main function to get investment and fixed cost projections. It calls the other functions in this module, and returns the projections in the specified format. Parameters ---------- config : .Config The function responds to, or passes on to other functions, the fields: :attr:`~.Config.base_year`, :attr:`~.Config.convergence_year`, :attr:`~.Config.fom_rate`, :attr:`~.Config.format`, :attr:`~.Config.method`, :attr:`~.Config.module`, :attr:`~.Config.node`, :attr:`~.Config.ref_region`, :attr:`~.Config.scenario`, and :attr:`~.Config.scenario_version`. Returns ------- dict Keys are "fix_cost" and "inv_cost", each mapped to a :class:`~.pandas.DataFrame`. If :attr:`.Config.format` is "message", the data frames have the same columns as required by :mod:`message_ix` for the respective parameter—for instance, the columns given by :py:`make_df("fix_cost", ...)`—*plus* columns named "scenario" and "scenario_version". """ # Validate configuration config.check() # Display configuration using the default __repr__ provided by @dataclass"Configuration: {config!r}") # Select function according to `config.method` func = { "convergence": create_projections_converge, "gdp": create_projections_gdp, "constant": create_projections_constant, }[config.method] # Create projections df_costs = func(config) # Convert to MESSAGEix format df_inv, df_fom = create_message_outputs(df_costs, config) if config.format == "iamc": df_inv, df_fom = create_iamc_outputs(df_inv, df_fom) return {"inv_cost": df_inv, "fix_cost": df_fom}