from ast import literal_eval
from typing import TYPE_CHECKING
import pandas as pd
import yaml
from message_ix import make_df
import message_ix_models.util
from message_ix_models.model.material.material_demand import material_demand_calc
from message_ix_models.model.material.util import read_config
from message_ix_models.util import broadcast, same_node
if TYPE_CHECKING:
from message_ix import Scenario
ssp_mode_map = {
"SSP1": "CTS core",
"SSP2": "RTS core",
"SSP3": "RTS high",
"SSP4": "CTS high",
"SSP5": "RTS high",
"LED": "CTS core", # TODO: move to even lower projection
}
iea_elasticity_map = {
"CTS core": (1.2, 0.25),
"CTS high": (1.3, 0.48),
"RTS core": (1.25, 0.35),
"RTS high": (1.4, 0.54),
}
[docs]def gen_data_methanol(scenario: "Scenario") -> dict[str, pd.DataFrame]:
"""
Generates data for methanol industry model
Parameters
----------
scenario: .Scenario
"""
context = read_config()
df_pars = pd.read_excel(
message_ix_models.util.package_data_path(
"material", "methanol", "methanol_sensitivity_pars.xlsx"
),
sheet_name="Sheet1",
dtype=object,
)
pars = df_pars.set_index("par").to_dict()["value"]
if pars["mtbe_scenario"] == "phase-out":
pars_dict = pd.read_excel(
message_ix_models.util.package_data_path(
"material", "methanol", "methanol_techno_economic.xlsx"
),
sheet_name=None,
dtype=object,
)
else:
pars_dict = pd.read_excel(
message_ix_models.util.package_data_path(
"material", "methanol", "methanol_techno_economic_high_demand.xlsx"
),
sheet_name=None,
dtype=object,
)
for i in pars_dict.keys():
pars_dict[i] = unpivot_input_data(pars_dict[i], i)
# TODO: only temporary hack to ensure SSP_dev compatibility
if "SSP_dev" in scenario.model:
file_path = message_ix_models.util.package_data_path(
"material", "methanol", "missing_rels.yaml"
)
with open(file_path, "r") as file:
missing_rels = yaml.safe_load(file)
df = pars_dict["relation_activity"]
pars_dict["relation_activity"] = df[~df["relation"].isin(missing_rels)]
default_gdp_elasticity_2020, default_gdp_elasticity_2030 = iea_elasticity_map[
ssp_mode_map[context["ssp"]]
]
df_final = material_demand_calc.gen_demand_petro(
scenario, "methanol", default_gdp_elasticity_2020, default_gdp_elasticity_2030
)
df_final["value"] = df_final["value"].apply(
lambda x: x * pars["methanol_resid_demand_share"]
)
pars_dict["demand"] = df_final
return pars_dict
[docs]def broadcast_nodes(
df_bc_node: pd.DataFrame,
df_final: pd.DataFrame,
node_cols: list[str],
node_cols_codes: dict[str, pd.Series],
i: int,
) -> pd.DataFrame:
"""
Broadcast nodes that were stored in pivoted row
Parameters
----------
df_bc_node: pd.DataFrame
df_final: pd.DataFrame
node_cols: list[str]
node_cols_codes: dict[str, pd.Series]
i: int
"""
if len(node_cols) == 1:
if "node_loc" in node_cols:
df_bc_node = df_bc_node.pipe(
broadcast, node_loc=node_cols_codes["node_loc"]
)
if "node_vtg" in node_cols:
df_bc_node = df_bc_node.pipe(
broadcast, node_vtg=node_cols_codes["node_vtg"]
)
if "node_rel" in node_cols:
df_bc_node = df_bc_node.pipe(
broadcast, node_rel=node_cols_codes["node_rel"]
)
if "node" in node_cols:
df_bc_node = df_bc_node.pipe(broadcast, node=node_cols_codes["node"])
if "node_share" in node_cols:
df_bc_node = df_bc_node.pipe(
broadcast, node_share=node_cols_codes["node_share"]
)
else:
df_bc_node = df_bc_node.pipe(broadcast, node_loc=node_cols_codes["node_loc"])
if len(df_final.loc[i][node_cols].T.unique()) == 1:
# df_bc_node["node_rel"] = df_bc_node["node_loc"]
df_bc_node = df_bc_node.pipe(
same_node
) # not working for node_rel in installed message_ix_models version
else:
if "node_rel" in list(df_bc_node.columns):
df_bc_node = df_bc_node.pipe(
broadcast, node_rel=node_cols_codes["node_rel"]
)
if "node_origin" in list(df_bc_node.columns):
df_bc_node = df_bc_node.pipe(
broadcast, node_origin=node_cols_codes["node_origin"]
)
if "node_dest" in list(df_bc_node.columns):
df_bc_node = df_bc_node.pipe(
broadcast, node_dest=node_cols_codes["node_dest"]
)
return df_bc_node
[docs]def broadcast_years(
df_bc_node: pd.DataFrame,
yr_col_out: list[str],
yr_cols_codes: dict[str, list[str]],
col: str,
) -> pd.DataFrame:
"""
Broadcast years that were stored in pivoted row
Parameters
----------
df_bc_node: pd.DataFrame
yr_col_out: list[str]
yr_cols_codes: ict[str, list[str]]
col: str
"""
if len(yr_col_out) == 1:
yr_list = [i[0] for i in yr_cols_codes[col]]
# print(yr_list)
if "year_act" in yr_col_out:
df_bc_node = df_bc_node.pipe(broadcast, year_act=yr_list)
if "year_vtg" in yr_col_out:
df_bc_node = df_bc_node.pipe(broadcast, year_vtg=yr_list)
if "year_rel" in yr_col_out:
df_bc_node = df_bc_node.pipe(broadcast, year_rel=yr_list)
if "year" in yr_col_out:
df_bc_node = df_bc_node.pipe(broadcast, year=yr_list)
df_bc_node[yr_col_out] = df_bc_node[yr_col_out].astype(int)
else:
if "year_vtg" in yr_col_out:
y_v = [str(i) for i in yr_cols_codes[col]]
df_bc_node = df_bc_node.pipe(broadcast, year_vtg=y_v)
df_bc_node["year_act"] = [
literal_eval(i)[1] for i in df_bc_node["year_vtg"]
]
df_bc_node["year_vtg"] = [
literal_eval(i)[0] for i in df_bc_node["year_vtg"]
]
if "year_rel" in yr_col_out:
if "year_act" in yr_col_out:
df_bc_node = df_bc_node.pipe(
broadcast, year_act=[i[0] for i in yr_cols_codes[col]]
)
df_bc_node["year_rel"] = df_bc_node["year_act"]
return df_bc_node