import logging
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, Literal, Union
import numpy as np
import pandas as pd
import scipy.optimize as opt
import yaml
from message_ix import make_df
from scipy.optimize import curve_fit
import message_ix_models.util
from message_ix_models import Context, ScenarioInfo
from message_ix_models.model.material.data_util import get_ssp_soc_eco_data
from message_ix_models.util import package_data_path
from .config import FittingConfig
from .math_util import (
GIGA,
MEGA,
cement_function,
gompertz,
steel_function,
)
if TYPE_CHECKING:
from message_ix import Scenario
file_gdp = "/iamc_db ENGAGE baseline GDP PPP.xlsx"
log = logging.getLogger(__name__)
mode_modifiers_dict: dict[str, dict[str, dict[str, float]]] = {
"low": {
"steel": {"a": 0.85, "b": 0.95},
"cement": {"a": 0.8},
"aluminum": {"a": 0.85, "b": 0.95},
},
"normal": {
"steel": {"a": 1, "b": 1},
"cement": {"a": 1},
"aluminum": {"a": 1, "b": 1},
},
"high": {
"steel": {"a": 1.3, "b": 1},
"cement": {"a": 1.2, "b": 1.5},
"aluminum": {"a": 1.4, "b": 1.3},
},
"highest": {
"steel": {"a": 1.3, "b": 1},
"cement": {"a": 1.8, "b": 4},
"aluminum": {"a": 1.3, "b": 1},
},
}
material_data = {
"aluminum": {"dir": "aluminum", "file": "/demand_aluminum.xlsx"},
"steel": {"dir": "steel", "file": "/STEEL_database_2012.xlsx"},
"cement": {"dir": "cement", "file": "/CEMENT.BvR2010.xlsx"},
"HVC": {"dir": "petrochemicals"},
"NH3": {"dir": "ammonia"},
"methanol": {"dir": "methanol"},
}
ssp_mode_map = {
"SSP1": "low",
"SSP2": "normal",
"SSP3": "high",
"SSP4": "normal",
"SSP5": "highest",
"LED": "low",
}
fitting_dict = {
"steel": {
"function": steel_function,
"initial_guess": [600, -10000, 0],
"x_data": ["gdp_pcap", "del_t"],
"phi": 9,
"mu": 0.1,
},
"cement": {
"function": cement_function,
"initial_guess": [500, -3000],
"x_data": ["gdp_pcap"],
"phi": 10,
"mu": 0.1,
},
"aluminum": {
"function": cement_function,
"initial_guess": [600, -10000],
"x_data": ["gdp_pcap"],
"phi": 9,
"mu": 0.1,
},
}
[docs]
def read_timer_pop(
datapath: Union[str, Path], material: Literal["cement", "steel", "aluminum"]
):
"""Read population data for a given material from TIMER model.
Parameters
----------
datapath
material
Returns
-------
pd.DataFrame
DataFrame containing population data with columns: [region, reg_no, year, pop]
"""
df_population = pd.read_excel(
f"{datapath}/{material_data[material]['dir']}{material_data[material]['file']}",
sheet_name="Timer_POP",
skiprows=[0, 1, 2, 30],
nrows=26,
)
df_population = df_population.drop(columns=["Unnamed: 0"]).rename(
{"Unnamed: 1": "reg_no", "Unnamed: 2": "region"}, axis=1
)
df_population = df_population.melt(
id_vars=["region", "reg_no"], var_name="year", value_name="pop"
)
return df_population
[docs]
def read_timer_gdp(
datapath: Union[str, Path], material: Literal["cement", "steel", "aluminum"]
):
"""Read GDP per capita data for a given material from TIMER Excel files.
Parameters
----------
datapath
Path to the directory containing the TIMER data files.
material
The material type for which GDP per capita data is being read.
Returns
-------
pd.DataFrame
A DataFrame containing GDP per capita data with the following columns:
- region: Region name.
- reg_no: Region number.
- year: Year of the data.
- gdp_pcap: GDP per capita value.
"""
# Read GDP per capita data
df_gdp = pd.read_excel(
f"{datapath}/{material_data[material]['dir']}{material_data[material]['file']}",
sheet_name="Timer_GDPCAP",
skiprows=[0, 1, 2, 30],
nrows=26,
)
df_gdp = df_gdp.drop(columns=["Unnamed: 0"]).rename(
{"Unnamed: 1": "reg_no", "Unnamed: 2": "region"}, axis=1
)
df_gdp = df_gdp.melt(
id_vars=["region", "reg_no"], var_name="year", value_name="gdp_pcap"
)
return df_gdp
[docs]
def project_demand(df: pd.DataFrame, phi: float, mu: float):
"""Project material demand over time using a convergence function.
This function calculates the projected demand for materials by applying a Gompertz
convergence function to historical demand data. It adjusts demand per capita and
total demand based on population and other factors.
Parameters
----------
df
Input DataFrame containing historical demand data with the following columns:
- demand.tot.base: Base year total demand.
- pop.mil: Population in millions.
- demand_pcap0: Initial demand per capita.
- year: Year of the data.
phi
Shape parameter for the Gompertz function.
mu
Rate parameter for the Gompertz function.
Returns
-------
pd.DataFrame
A DataFrame containing projected demand data with the following columns:
- region: Region name.
- year: Year of the data.
- demand_tot: Projected total demand.
"""
df_demand = df.groupby("region", group_keys=False)[df.columns].apply(
lambda group: group.assign(
demand_pcap_base=group["demand.tot.base"].iloc[0]
* GIGA
/ group["pop.mil"].iloc[0]
/ MEGA
)
)
df_demand = df_demand.groupby("region", group_keys=False)[df_demand.columns].apply(
lambda group: group.assign(
gap_base=group["demand_pcap_base"].iloc[0] - group["demand_pcap0"].iloc[0]
)
)
df_demand = df_demand.groupby("region", group_keys=False)[df_demand.columns].apply(
lambda group: group.assign(
demand_pcap=group["demand_pcap0"]
+ group["gap_base"] * gompertz(phi, mu, y=group["year"])
)
)
df_demand = (
df_demand.groupby("region", group_keys=False)[df_demand.columns]
.apply(
lambda group: group.assign(
demand_tot=group["demand_pcap"] * group["pop.mil"] * MEGA / GIGA
)
)
.reset_index(drop=True)
)
return df_demand[["region", "year", "demand_tot"]]
[docs]
def read_base_demand(filepath: Union[str, Path]):
"""Read base year demand data from a YAML file.
Parameters
----------
filepath
Path to the YAML file containing base demand data.
Returns
-------
pd.DataFrame
DataFrame with columns: region, year, value.
"""
with open(filepath, "r") as file:
yaml_data = file.read()
data_list = yaml.safe_load(yaml_data)
df = pd.DataFrame(
[
(key, value["year"], value["value"])
for entry in data_list
for key, value in entry.items()
],
columns=["region", "year", "value"],
)
return df
[docs]
def read_hist_mat_demand(material: Literal["cement", "steel", "aluminum"]):
"""Read historical material demand data for a specified commodity.
This function retrieves historical data on material consumption, population, and GDP
for the specified material. It processes the data from various sources and formats
it into a consistent structure.
Parameters
----------
material
Commodity for which historical demand data is being read.
Returns
-------
pd.DataFrame
DataFrame containing historical material demand data with the following columns:
- region: Region name.
- reg_no: Region number.
- year: Year of the data.
- consumption: Material consumption.
- pop: Population data.
- gdp_pcap: GDP per capita.
- cons_pcap: Consumption per capita.
- del_t: Time difference from the base year (2010).
"""
datapath = message_ix_models.util.package_data_path("material")
if material in ["cement", "steel"]:
# Read population data
df_pop = read_timer_pop(datapath, material)
# Read GDP per capita data
df_gdp = read_timer_gdp(datapath, material)
if material == "aluminum":
df_raw_cons = (
pd.read_excel(
f"{datapath}/{material_data[material]['dir']}{material_data[material]['file']}",
sheet_name="final_table",
nrows=378,
)
.drop(["cons.pcap", "del.t"], axis=1)
.rename({"gdp.pcap": "gdp_pcap"}, axis=1)
)
df_cons = (
df_raw_cons.assign(
cons_pcap=lambda x: x["consumption"] / x["pop"],
del_t=lambda x: x["year"].astype(int) - 2010,
)
.dropna()
.query("cons_pcap > 0")
)
elif material == "steel":
df_raw_cons = pd.read_excel(
f"{datapath}/{material_data[material]['dir']}{material_data[material]['file']}",
sheet_name="Consumption regions",
nrows=26,
)
df_raw_cons = (
df_raw_cons.rename({"t": "region"}, axis=1)
.drop(columns=["Unnamed: 1"])
.rename({"TIMER Region": "reg_no"}, axis=1)
)
df_raw_cons = df_raw_cons.melt(
id_vars=["region", "reg_no"], var_name="year", value_name="consumption"
)
df_raw_cons["region"] = (
df_raw_cons["region"]
.str.replace("(", "")
.str.replace(")", "")
.str.replace(r"\d+", "")
.str[:-1]
)
# Merge and organize data
df_cons = (
pd.merge(df_raw_cons, df_pop.drop("region", axis=1), on=["reg_no", "year"])
.merge(df_gdp[["reg_no", "year", "gdp_pcap"]], on=["reg_no", "year"])
.assign(
cons_pcap=lambda x: x["consumption"] / x["pop"],
del_t=lambda x: x["year"].astype(int) - 2010,
)
.dropna()
.query("cons_pcap > 0")
)
elif material == "cement":
df_raw_cons = pd.read_excel(
f"{datapath}/{material_data[material]['dir']}{material_data[material]['file']}",
sheet_name="Regions",
skiprows=122,
nrows=26,
)
df_raw_cons = df_raw_cons.rename(
{"Region #": "reg_no", "Region": "region"}, axis=1
)
df_raw_cons = df_raw_cons.melt(
id_vars=["region", "reg_no"], var_name="year", value_name="consumption"
)
# Merge and organize data
df_cons = (
pd.merge(df_raw_cons, df_pop.drop("region", axis=1), on=["reg_no", "year"])
.merge(df_gdp[["reg_no", "year", "gdp_pcap"]], on=["reg_no", "year"])
.assign(
cons_pcap=lambda x: x["consumption"] / x["pop"] / 10**6,
del_t=lambda x: x["year"].astype(int) - 2010,
)
.dropna()
.query("cons_pcap > 0")
)
else:
log.error(
"non-available material selected. must be one of [aluminum, steel, cement]"
)
df_cons = None
return df_cons
[docs]
def read_pop_from_scen(scen: "Scenario") -> pd.DataFrame:
"""Extract population data from a MESSAGEix scenario.
Filters the data to include only years from 2020 onwards and renames columns.
Parameters
----------
scen : "Scenario"
The MESSAGEix scenario containing population data.
Returns
-------
pd.DataFrame
A DataFrame containing population data with the following columns:
- region: Region name.
- year: Year of the data.
- pop.mil: Population in millions.
"""
pop = scen.par("bound_activity_up", {"technology": "Population"})
pop = pop.loc[pop.year_act >= 2020].rename(
columns={"year_act": "year", "value": "pop.mil", "node_loc": "region"}
)
pop = pop.drop(["mode", "time", "technology", "unit"], axis=1)
return pop
[docs]
def read_gdp_ppp_from_scen(scen: "Scenario") -> pd.DataFrame:
"""Extract GDP (PPP) data from a MESSAGEix scenario.
If GDP (PPP) data is not directly available, it calculates it using GDP (MER)
and a MER-to-PPP conversion factor.
Parameters
----------
scen
The MESSAGEix scenario containing GDP data.
Returns
-------
pd.DataFrame
A DataFrame containing GDP (PPP) data with the following columns:
- year: Year of the data.
- region: Region name.
- gdp_ppp: GDP (PPP) value.
"""
if len(scen.par("bound_activity_up", filters={"technology": "GDP_PPP"})):
gdp = scen.par("bound_activity_up", {"technology": "GDP_PPP"})
gdp = gdp.rename({"value": "gdp_ppp", "year_act": "year"}, axis=1)[
["year", "node_loc", "gdp_ppp"]
]
else:
gdp_mer = scen.par("bound_activity_up", {"technology": "GDP"})
mer_to_ppp = scen.par("MERtoPPP")
if not len(mer_to_ppp):
mer_to_ppp = pd.read_csv(
message_ix_models.util.package_data_path(
"material", "other", "mer_to_ppp_default.csv"
)
)
mer_to_ppp = mer_to_ppp.set_index(["node", "year"])
gdp_mer = gdp_mer.merge(
mer_to_ppp.reset_index()[["node", "year", "value"]],
left_on=["node_loc", "year_act"],
right_on=["node", "year"],
)
gdp_mer["gdp_ppp"] = gdp_mer["value_y"] * gdp_mer["value_x"]
gdp = gdp_mer[["year", "node_loc", "gdp_ppp"]]
gdp = gdp.loc[gdp.year >= 2020].rename(columns={"node_loc": "region"})
return gdp
[docs]
def read_socio_economic_projection(
scen: "Scenario",
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Read socio-economic projections (population and GDP)"""
datapath = message_ix_models.util.package_data_path("material")
# read pop projection from scenario
df_pop = read_pop_from_scen(scen)
if df_pop.empty:
log.info(
"Scenario does not provide Population projections. Reading default "
"timeseries instead"
)
ctx = Context.get_instance(-1)
ctx.update(regions="R12")
df_pop = (
get_ssp_soc_eco_data(ctx, "IIASA", "POP", "Population")
.rename(
columns={"year_act": "year", "value": "pop.mil", "node_loc": "region"}
)
.drop(["mode", "time", "technology", "unit"], axis=1)
)
# read gdp (PPP) projection from scenario
df_gdp = read_gdp_ppp_from_scen(scen)
# if not retrievable, read default from exogenous file instead
if df_gdp.empty:
log.info(
"Scenario does not provide GDP projections. Reading default"
"timeseries instead"
)
df_gdp = pd.read_excel(f"{datapath}/other{file_gdp}", sheet_name="data_R12")
df_gdp = (
df_gdp[df_gdp["Scenario"] == "baseline"]
.loc[:, ["Region", *[i for i in df_gdp.columns if isinstance(i, int)]]]
.melt(id_vars="Region", var_name="year", value_name="gdp_ppp")
.query('Region != "World"')
.assign(
year=lambda x: x["year"].astype(int),
region=lambda x: "R12_" + x["Region"],
)
)
return df_gdp, df_pop
def derive_demand(
material: Literal["cement", "steel", "aluminum"],
scen: "Scenario",
ssp: Union[Literal["SSP1", "SSP2", "SSP3", "SSP4", "SSP5", "LED"], str] = "SSP2",
new: bool = False,
model: Literal["least-squares", "quantile"] = "least-squares",
):
datapath = message_ix_models.util.package_data_path("material")
df_gdp, df_pop = read_socio_economic_projection(scen)
# get base year demand of material
df_base_demand = read_base_demand(
f"{datapath}/{material_data[material]['dir']}/demand_{material}.yaml"
)
# get historical data (material consumption, pop, gdp)
if new:
df_cons = pd.read_csv(
"/Users/florianmaczek/PycharmProjects/IEA-statistics-analysis/input_data/steel/steel_consumption.csv"
)
else:
df_cons = read_hist_mat_demand(material)
config = FittingConfig(**fitting_dict[material]) # type: ignore
x_data = tuple(pd.Series(df_cons[col]) for col in config.x_data)
# run regression on historical data
if model == "least-squares":
params_opt = curve_fit(
config.function,
xdata=x_data,
ydata=df_cons["cons_pcap"],
p0=config.initial_guess,
)[0]
mode = ssp_mode_map[ssp]
log.info(f"adjust regression parameters according to mode: {mode}")
log.info(f"before adjustment: {params_opt}")
mods = list(mode_modifiers_dict[mode][material].values())
for idx, multiplier in enumerate(mods):
params_opt[idx] *= multiplier
log.info(f"after adjustment: {params_opt}")
if model == "quantile":
params_opt = perform_quantile_reg(
x_data,
df_cons["cons_pcap"],
config.function,
config.initial_guess,
0.5,
)
# prepare df for applying regression model and project demand
df_all = prepare_model_input(df_pop, df_gdp, df_base_demand)
df_all["demand_pcap0"] = df_all.apply(
lambda row: config.function(tuple(row[i] for i in config.x_data), *params_opt),
axis=1,
)
# correct base year difference with convergence function
df_final = project_demand(df_all, config.phi, config.mu)
# format to MESSAGEix standard
df_par = format_to_demand_par(df_final, material)
return df_par
[docs]
def gen_demand_petro(
scenario: "Scenario",
chemical: Literal["HVC", "methanol", "NH3"],
gdp_elasticity_2020: float,
gdp_elasticity_2030: float,
):
"""Generate petrochemical demand projections based on GDP elasticity.
This function calculates the demand for petrochemicals (HVC, methanol, NH3) using
distinct GDP elasticity values for 2020-2030 and post-2030. GDP projections are
queried from scenario if available or read from external data file.
Parameters
----------
scenario
The MESSAGEix scenario containing input data.
chemical
The petrochemical for which demand is being projected.
gdp_elasticity_2020
GDP elasticity value for the period up to 2020.
gdp_elasticity_2030
GDP elasticity value for the period after 2030.
Returns
-------
pd.DataFrame
``demand`` parameter data
"""
chemicals_implemented = ["HVC", "methanol", "NH3"]
if chemical not in chemicals_implemented:
raise ValueError(
f"'{chemical}' not supported. Choose one of {chemicals_implemented}"
)
s_info = ScenarioInfo(scenario)
modelyears = s_info.Y # s_info.Y is only for modeling years
fy = scenario.firstmodelyear
def get_demand_t1_with_income_elasticity(
demand_t0, income_t0, income_t1, elasticity
):
"""
Calculate demand for the next year using income elasticity.
Parameters
----------
demand_t0 : pd.Series
Demand in the current year.
income_t0 : pd.Series
Income in the current year.
income_t1 : pd.Series
Income in the next year.
elasticity : float
Income elasticity value.
Returns
-------
pd.Series
Projected demand for the next year.
"""
return (
elasticity * demand_t0.mul(((income_t1 - income_t0) / income_t0), axis=0)
) + demand_t0
if "GDP_PPP" in list(scenario.set("technology")):
gdp_ppp = scenario.par("bound_activity_up", {"technology": "GDP_PPP"})
df_gdp_ts = gdp_ppp.pivot(
index="node_loc", columns="year_act", values="value"
).reset_index()
num_cols = [i for i in df_gdp_ts.columns if isinstance(i, int)]
hist_yrs = [i for i in num_cols if i < fy]
df_gdp_ts = (
df_gdp_ts.drop([i for i in hist_yrs if i in df_gdp_ts.columns], axis=1)
.set_index("node_loc")
.sort_index()
)
elif "GDP" in list(scenario.set("technology")):
gdp_mer = scenario.par("bound_activity_up", {"technology": "GDP"})
mer_to_ppp = pd.read_csv(
package_data_path("material", "other", "mer_to_ppp_default.csv")
).set_index(["node", "year"])
# TODO: might need to be re-activated for different SSPs
gdp_mer = gdp_mer.merge(
mer_to_ppp.reset_index()[["node", "year", "value"]],
left_on=["node_loc", "year_act"],
right_on=["node", "year"],
)
gdp_mer["gdp_ppp"] = gdp_mer["value_y"] * gdp_mer["value_x"]
gdp_mer = gdp_mer[["year", "node_loc", "gdp_ppp"]].reset_index()
gdp_mer["Region"] = gdp_mer["node_loc"] # .str.replace("R12_", "")
df_gdp_ts = gdp_mer.pivot(
index="Region", columns="year", values="gdp_ppp"
).reset_index()
num_cols = [i for i in df_gdp_ts.columns if isinstance(i, int)]
hist_yrs = [i for i in num_cols if i < fy]
df_gdp_ts = (
df_gdp_ts.drop([i for i in hist_yrs if i in df_gdp_ts.columns], axis=1)
.set_index("Region")
.sort_index()
)
else:
df_gdp = pd.read_excel(
f"{message_ix_models.util.package_data_path('material')}/other{file_gdp}",
sheet_name="data_R12",
)
df_gdp_ts = (
df_gdp[df_gdp["Scenario"] == "baseline"]
.loc[:, ["Region", *[i for i in df_gdp.columns if isinstance(i, int)]]]
.melt(id_vars="Region", var_name="year", value_name="gdp_ppp")
.query('Region != "World"')
.assign(
year=lambda x: x["year"].astype(int),
region=lambda x: "R12_" + x["Region"],
)
.pivot(index="region", columns="year", values="gdp_ppp")
)
df_demand_2020 = read_base_demand(
package_data_path()
/ "material"
/ f"{material_data[chemical]['dir']}/demand_{chemical}.yaml"
)
df_demand_2020 = df_demand_2020.rename({"region": "Region"}, axis=1)
df_demand = df_demand_2020.pivot(index="Region", columns="year", values="value")
dem_next_yr = df_demand
for i in range(len(modelyears) - 1):
income_year1 = modelyears[i]
income_year2 = modelyears[i + 1]
if income_year2 >= 2030:
dem_next_yr = get_demand_t1_with_income_elasticity(
dem_next_yr,
df_gdp_ts[income_year1],
df_gdp_ts[income_year2],
gdp_elasticity_2030,
)
else:
dem_next_yr = get_demand_t1_with_income_elasticity(
dem_next_yr,
df_gdp_ts[income_year1],
df_gdp_ts[income_year2],
gdp_elasticity_2020,
)
df_demand[income_year2] = dem_next_yr
df_melt = df_demand.melt(ignore_index=False).reset_index()
level = "demand" if chemical in ["HVC", "methanol"] else "final_material"
return make_df(
"demand",
unit="t",
level=level,
value=df_melt.value,
time="year",
commodity=chemical,
year=df_melt.year,
node=df_melt["Region"],
)
def perform_quantile_reg(
x: tuple[pd.Series, ...],
y: pd.Series,
func: Callable,
initial_params: List[int],
tau: float = 0.5,
):
# Define quantile loss function
def quantile_loss(params, x, y, tau):
y_pred = func(x, *params)
residuals = y - y_pred
return np.sum(
(tau * np.maximum(residuals, 0)) + ((1 - tau) * np.maximum(-residuals, 0))
)
res = opt.minimize(
quantile_loss, initial_params, args=(x, y, tau), method="Nelder-Mead"
)
# Extract optimized parameters
a_opt, b_opt, c_opt = res.x
# Plot results
# plt.scatter(x, y, alpha=0.5, label="Data")
# plt.plot(x, func(x, *res.x), "r-", label=f"Quantile {tau} regression")
# plt.legend()
# plt.xlabel("x")
# plt.ylabel("y")
# plt.title("Non-linear Quantile Regression")
# plt.show()
return a_opt, b_opt, c_opt