"""Utility functions for MESSAGEix-BMT (Buildings, Materials, Transport) integration."""
import logging
from typing import TYPE_CHECKING
import pandas as pd
from message_ix_models.model.material.data_power_sector import gen_data_power_sector
from message_ix_models.util import add_par_data
if TYPE_CHECKING:
from message_ix import Scenario
from message_ix_models import ScenarioInfo
log = logging.getLogger(__name__)
def _generate_vetting_csv(
original_demand: pd.DataFrame,
modified_demand: pd.DataFrame,
output_path: str,
) -> None:
"""Generate a CSV file showing material demand subtraction details.
Parameters
----------
original_demand : pd.DataFrame
Original demand data before subtraction
modified_demand : pd.DataFrame
Modified demand data after subtraction
output_path : str
Path where to save the vetting CSV file
"""
# Reset index to work with columns
orig = original_demand.reset_index()
mod = modified_demand.reset_index()
# Merge original and modified data
vetting_data = orig.merge(
mod,
on=["node", "year", "commodity"],
suffixes=("_original", "_modified"),
how="outer",
).fillna(0)
# Calculate subtraction amounts and percentages
vetting_data["subtracted_amount"] = (
vetting_data["value_original"] - vetting_data["value_modified"]
)
# Calculate percentage subtracted (avoid division by zero)
vetting_data["subtraction_percentage"] = (
vetting_data["subtracted_amount"]
/ vetting_data["value_original"].replace(0, 1)
* 100
)
# Replace infinite values with 0 (when original was 0)
vetting_data["subtraction_percentage"] = vetting_data[
"subtraction_percentage"
].replace([float("inf"), -float("inf")], 0)
# Round to reasonable precision
vetting_data["subtraction_percentage"] = vetting_data[
"subtraction_percentage"
].round(2)
# Select and rename columns for clarity
output_columns = [
"node",
"year",
"commodity",
"value_original",
"value_modified",
"subtracted_amount",
"subtraction_percentage",
]
vetting_data = vetting_data[output_columns].copy()
vetting_data.columns = [
"node",
"year",
"commodity",
"original_demand",
"modified_demand",
"subtracted_amount",
"subtraction_percentage",
]
# # Filter out rows where no subtraction occurred
# vetting_data = vetting_data[vetting_data["subtracted_amount"] > 0]
# Sort by commodity, node, year for better readability
vetting_data = vetting_data.sort_values(["commodity", "node", "year"])
# Save to CSV
vetting_data.to_csv(output_path, index=False)
log.info(f"Vetting CSV saved to: {output_path}")
# Log summary statistics
if len(vetting_data) > 0:
avg_pct = vetting_data["subtraction_percentage"].mean()
max_pct = vetting_data["subtraction_percentage"].max()
log.info(f"Average subtraction percentage: {avg_pct:.2f}%")
log.info(f"Max subtraction percentage: {max_pct:.2f}%")
# Maybe it is better to have one function for each method?
[docs]
def subtract_material_demand(
scenario: "Scenario",
info: "ScenarioInfo",
sturm_r: pd.DataFrame,
sturm_c: pd.DataFrame,
method: str = "bm_subtraction",
generate_vetting_csv: bool = True,
vetting_output_path: str = "material_demand_subtraction_vetting.csv",
) -> pd.DataFrame:
"""Subtract inter-sector material demand from existing demands in scenario.
This function provides different approaches for subtracting inter-sector material
demand from the original material demand, due to BM (material inputs for residential
and commercial building construction), PM (material inputs for power capacity), IM
(material inputs for infrastructures), TM (material inputs for new vehicles) links.
Parameters
----------
scenario : message_ix.Scenario
The scenario to modify
info : ScenarioInfo
Scenario information
sturm_r : pd.DataFrame
Residential STURM data
sturm_c : pd.DataFrame
Commercial STURM data
method : str, optional
Method to use for subtraction:
- "bm_subtraction": default, substract entire trajectory
- "im_subtraction": substract base year and rerun material demand projection
- "pm_subtraction": to be determined (currently treated as additional demand)
- "tm_subtraction": to be determined
generate_vetting_csv : bool, optional
Whether to generate a CSV file showing subtraction details (default: True)
vetting_output_path : str, optional
Path for the vetting CSV file (default:
"material_demand_subtraction_vetting.csv")
Returns
-------
pd.DataFrame
Modified demand data with material demand subtracted
"""
# Method adopted in NAVIGATE workflow 2023 by PNK
# Retrieve data once
target_commodities = ["cement", "steel", "aluminum"]
# Updated filter from level to commodities to avoid non-material commodities
mat_demand = scenario.par("demand", {"commodity": target_commodities})
index_cols = ["node", "year", "commodity"]
if method == "bm_subtraction":
# Store original demand for vetting if requested
original_demand = mat_demand.copy() if generate_vetting_csv else None
# Subtract the building material demand trajectory from existing demands
for rc, base_data, how in (
("resid", sturm_r, "right"),
("comm", sturm_c, "outer"),
):
new_col = f"demand_{rc}_const"
# - Drop columns.
# - Rename "value" to e.g. "demand_resid_const".
# - Extract MESSAGEix-Materials commodity name from STURM commodity name.
# - Drop other rows.
# - Set index.
df = (
base_data.drop(columns=["level", "time", "unit"])
.rename(columns={"value": new_col})
.assign(
commodity=lambda _df: _df.commodity.str.extract(
f"{rc}_mat_demand_(cement|steel|aluminum)",
expand=False,
# Directly provided by STURM reporting
# No need to multiply intensities and floor space
)
)
.dropna(subset=["commodity"])
.set_index(index_cols)
)
# Merge existing demands at level "demand".
# - how="right": drop all rows in par("demand", …) with no match in `df`.
# - how="outer": keep the union of rows in `mat_demand` (e.g. from sturm_r)
# and in `df` (from sturm_c); fill NA with zeroes.
mat_demand = mat_demand.join(df, on=index_cols, how=how).fillna(0)
# False if main() is being run for the second time on `scenario`
first_pass = "construction_resid_build" not in info.set["technology"]
# If not on the first pass, this modification is already performed; skip
if first_pass:
# - Compute new value = (existing value - STURM values), but no less than 0.
# - Drop intermediate column.
mat_demand = (
mat_demand.eval(
"value = value - demand_comm_const - demand_resid_const"
)
.assign(value=lambda df: df["value"].clip(0))
.drop(columns=["demand_comm_const", "demand_resid_const"])
)
# Generate vetting CSV if requested
if generate_vetting_csv and original_demand is not None:
_generate_vetting_csv(original_demand, mat_demand, vetting_output_path)
elif method == "im_subtraction":
# TODO: to be implemented
log.warning("Method 'im_subtraction' not implemented yet, using bm_subtraction")
return subtract_material_demand(
scenario, info, sturm_r, sturm_c, "bm_subtraction"
)
elif method == "pm_subtraction":
# TODO: Implement alternative method 2
log.warning("Method 'pm_subtraction' not implemented yet, using bm_subtraction")
return subtract_material_demand(
scenario, info, sturm_r, sturm_c, "bm_subtraction"
)
elif method == "tm_subtraction":
# TODO: Implement alternative method 3
log.warning("Method 'tm_subtraction' not implemented yet, using bm_subtraction")
return subtract_material_demand(
scenario, info, sturm_r, sturm_c, "bm_subtraction"
)
else:
raise ValueError(f"Unknown method: {method}")
return mat_demand
[docs]
def build_PM(context, scenario: "Scenario", **kwargs) -> "Scenario":
"""Build the material intensity for power capacities.
This function adds power sector material intensity parameters (input_cap_new,
input_cap_ret, output_cap_new, output_cap_ret) to the scenario if they do not
already exist.
Parameters
----------
context
The context of the scenario to be add intensity data to.
scenario : message_ix.Scenario
The scenario to add material-power sector linkages to.
**kwargs
Additional keyword arguments (ignored, for workflow compatibility).
"""
# Check if power sector material data already exists
if scenario.has_par("input_cap_new"):
try:
existing_data = scenario.par("input_cap_new")
if (
not existing_data.empty
and "cement" in existing_data.get("commodity", pd.Series()).values
):
log.info(
"Power sector material intensity data already exists "
"(found cement in input_cap_new). Skipping build_pm."
)
return scenario
except Exception as e:
log.warning(f"Could not check existing input_cap_new data: {e}")
log.info("Adding material intensity for power capacities...")
scenario.check_out()
try:
power_data = gen_data_power_sector(scenario, dry_run=False)
add_par_data(scenario, power_data, dry_run=False)
# but actually do not know how to provide log info while adding those parameters
log.info("Successfully added power sector material intensity data.")
except Exception as e:
log.error(f"Error adding power sector material data: {e}")
raise
return scenario