Source code for message_ix_models.tools.bilateralize.bare_to_scenario

# -*- coding: utf-8 -*-
"""
Move data from bare files to a dictionary to update a MESSAGEix scenario

This script is the second step in implementing the bilateralize tool.
It moves data from /data/bilateralize/[your_trade_commodity]/bare_files/
to a dictionary compatible with updating a MESSAGEix scenario.
"""

# Import packages
import logging
import os
import pickle
from pathlib import Path
from typing import Any, Dict

import numpy as np
import pandas as pd

from message_ix_models.tools.bilateralize.historical_calibration import (
    build_hist_new_capacity_flow,
    build_hist_new_capacity_trade,
    build_historical_activity,
)
from message_ix_models.tools.bilateralize.utils import get_logger, load_config
from message_ix_models.util import package_data_path


# %% Broadcast vintage years
[docs] def broadcast_years( df: pd.DataFrame, year_type: str, year_list: list[int] ) -> pd.DataFrame: """ Broadcast vintage, relation, or activity years. Args: df: Input parameter DataFrame year_type: Type of year to broadcast (e.g., 'year_vtg', 'year_rel', 'year_act') year_list: List of years to broadcast Returns: pd.DataFrame: DataFrame with expanded rows for each year """ all_new_rows = [] for _, row in df.iterrows(): for y in year_list: new_row = row.copy() new_row[year_type] = int(y) all_new_rows.append(new_row) result_df = pd.concat([df, pd.DataFrame(all_new_rows)], ignore_index=True) result_df = result_df[result_df[year_type] != "broadcast"] return result_df.drop_duplicates()
# %% Broadcast years to create vintage-activity year pairs.
[docs] def broadcast_yv_ya( df: pd.DataFrame, ya_list: list[int], yv_list: list[int], tec_lifetime: pd.DataFrame ): """ Broadcast years to create vintage-activity year pairs. Args: df: Input parameter DataFrame ya_list: List of activity years to consider yv_list: List of vintage years to consider tec_lifetime: Technical lifetime of the technology, provided via dataframe Returns: pd.DataFrame: DataFrame with expanded rows for each vintage-activity year pair """ all_new_rows = [] tecltdf = tec_lifetime.copy() tecltdf["teclt"] = tecltdf["value"] lts = df.merge( tecltdf[["node_loc", "technology", "teclt"]].drop_duplicates(), left_on=["node_loc", "technology"], right_on=["node_loc", "technology"], how="left", ) # Process each row in the original DataFrame for _, row in lts.iterrows(): teclt_row = row["teclt"] # For each activity year for ya in ya_list: # Get all vintage years that are <=year_act for a period<tec_lifetime yv_list = [yv for yv in ya_list if yv <= ya] yv_list = [yv for yv in yv_list if yv >= ya - teclt_row] # Create new rows for each vintage year for yv in yv_list: new_row = row.copy() new_row["year_act"] = int(ya) new_row["year_vtg"] = int(yv) all_new_rows.append(new_row) # Combine original DataFrame with new rows result_df = pd.DataFrame(all_new_rows).drop(["teclt"], axis=1) result_df = result_df[result_df["year_vtg"] != "broadcast"] return result_df
# %% Full broadcast function
[docs] def full_broadcast( data_dict: dict, tec: str, ty: str, ya_list: list[int], yv_list: list[int], log: logging.Logger, ) -> dict: """ Full broadcast function Args: data_dict: Dictionary of parameter dataframes tec: Technology name ty: Type of parameter (trade or flow) ya_list: List of activity years yv_list: List of vintage years log: Logger Outputs: data_dict: Dictionary of parameter dataframes with broadcasted years """ for i in data_dict[ty].keys(): if "year_rel" in data_dict[ty][i].columns: log.info(f"Parameter {i} in {tec} {ty} broadcasted for year_rel") if data_dict[ty][i]["year_rel"].iloc[0] == "broadcast": data_dict[ty][i] = broadcast_years( df=data_dict[ty][i], year_type="year_rel", year_list=ya_list ) data_dict[ty][i]["year_act"] = data_dict[ty][i]["year_rel"] else: pass if ( "year_vtg" in data_dict[ty][i].columns and "year_act" in data_dict[ty][i].columns ): if ( data_dict[ty][i]["year_vtg"].iloc[0] == "broadcast" and data_dict[ty][i]["year_act"].iloc[0] == "broadcast" ): log.info(f"{i} in {tec} {ty} broadcasted for year_vtg+year_act") teclt_df = data_dict[ty]["technical_lifetime"].copy() data_dict[ty][i] = broadcast_yv_ya( df=data_dict[ty][i], ya_list=ya_list, yv_list=yv_list, tec_lifetime=teclt_df, ) elif ( data_dict[ty][i]["year_vtg"].iloc[0] == "broadcast" and data_dict[ty][i]["year_act"].iloc[0] != "broadcast" ): log.info(f"{i} in {tec} {ty} broadcasted for year_vtg") data_dict[ty][i] = broadcast_years( df=data_dict[ty][i], year_type="year_vtg", year_list=yv_list ) elif ( "year_vtg" in data_dict[ty][i].columns and "year_act" not in data_dict[ty][i].columns ): if data_dict[ty][i]["year_vtg"].iloc[0] == "broadcast": log.info(f"{i} in {tec} {ty} broadcasted for year_vtg") data_dict[ty][i] = broadcast_years( df=data_dict[ty][i], year_type="year_vtg", year_list=yv_list ) elif ( "year_vtg" not in data_dict[ty][i].columns and "year_act" in data_dict[ty][i].columns ): if data_dict[ty][i]["year_act"].iloc[0] == "broadcast": log.info(f"{i} in {tec} {ty} broadcasted for year_act") data_dict[ty][i] = broadcast_years( df=data_dict[ty][i], year_type="year_act", year_list=ya_list ) else: pass return data_dict
# %% Build out bare sheets
[docs] def build_parameter_sheets( log, project_name: str | None = None, config_name: str | None = None ): """ Read the input csv files and build the tech sets and parameters. Args: project_name (str, optional): Project name (message_ix_models/project/[THIS]) config_name (str, optional): Name of the config file. If None, uses default config from data/bilateralize/config_default.yaml Returns: outdict: Dictionary of parameter dataframes """ # Load config config, config_path = load_config(project_name, config_name) covered_tec = config.get("covered_trade_technologies", {}) outdict = dict() ya_list = config["timeframes"]["year_act_list"] yv_list = config["timeframes"]["year_vtg_list"] for tec in covered_tec: tecpath = os.path.join(Path(package_data_path("bilateralize")), tec) data_dict: Dict[str, Dict[str, Any]] = {"trade": {}, "flow": {}} for ty in ["trade", "flow"]: if ty == "trade": tpath = os.path.join(tecpath, "bare_files") elif ty == "flow": tpath = os.path.join(tecpath, "bare_files", "flow_technology") csv_files = [f for f in Path(tpath).glob("*.csv")] for csv_file in csv_files: key = csv_file.stem data_dict[ty][key] = pd.read_csv(csv_file) # Broadcast the data for ty in ["trade", "flow"]: data_dict = full_broadcast( data_dict=data_dict, tec=tec, ty=ty, ya_list=ya_list, yv_list=yv_list, log=log, ) # Imports do not vintage for par in ["capacity_factor", "input", "output"]: if par in list(data_dict["trade"].keys()): vdf = data_dict["trade"][par] vdf = vdf[ ( (vdf["technology"].str.contains("_imp")) & (vdf["year_vtg"] == vdf["year_act"]) ) | (vdf["technology"].str.contains("_exp_")) ] data_dict["trade"][par] = vdf # Variable costs for flows should not broadcast for par in ["var_cost"]: if par in list(data_dict["flow"].keys()): vdf = data_dict["flow"][par] vdf = vdf[vdf["year_act"] == vdf["year_vtg"]] data_dict["flow"][par] = vdf outdict[tec] = data_dict return outdict
[docs] def calibrate_historical_shipping( config: dict, trade_dict: dict, covered_tec: list[str], project_name: str | None = None, config_name: str | None = None, ): # Historical new capacity for maritime shipping shipping_fuel_dict = config["shipping_fuels"] # TODO: Add coal hist_cr_loil = build_hist_new_capacity_flow( infile="Crude Tankers.csv", ship_type="crudeoil_tanker_loil", project_name=project_name, config_name=config_name, ) hist_lh2_loil = build_hist_new_capacity_flow( infile="LH2 Tankers.csv", ship_type="lh2_tanker_loil", project_name=project_name, config_name=config_name, ) hist_lng = pd.DataFrame() for f in ["loil", "LNG"]: hist_lng_f = build_hist_new_capacity_flow( infile="LNG Tankers.csv", ship_type="LNG_tanker_" + f, project_name=project_name, config_name=config_name, ) hist_lng_f["value"] *= shipping_fuel_dict["LNG_tanker"]["LNG_tanker_" + f] hist_lng = pd.concat([hist_lng, hist_lng_f]) hist_oil = pd.DataFrame() for f in ["loil", "foil", "eth"]: hist_oil_f = build_hist_new_capacity_flow( infile="Oil Tankers.csv", ship_type="oil_tanker_" + f, project_name=project_name, config_name=config_name, ) hist_oil_f["value"] *= shipping_fuel_dict["oil_tanker"]["oil_tanker_" + f] hist_oil = pd.concat([hist_oil, hist_oil_f]) hist_eth = hist_oil[hist_oil["technology"] != "oil_tanker_foil"] nc_dict = { "crudeoil_shipped": hist_cr_loil, "lh2_shipped": hist_lh2_loil, "LNG_shipped": hist_lng, "eth_shipped": hist_eth, "foil_shipped": hist_oil, "loil_shipped": hist_oil, } for tec in nc_dict.keys(): trade_dict[tec]["flow"]["historical_new_capacity"] = nc_dict[tec] # Historical activity should only be added for technologies in input for tec in covered_tec: input_tecs = trade_dict[tec]["trade"]["input"]["technology"] if "historical_activity" in trade_dict[tec]["trade"].keys(): tdf = trade_dict[tec]["trade"]["historical_activity"] tdf = tdf[tdf["technology"].isin(input_tecs)] trade_dict[tec]["trade"]["historical_activity"] = tdf if "historical_new_capacity" in trade_dict[tec]["trade"].keys(): tdf = trade_dict[tec]["trade"]["historical_new_capacity"] tdf = tdf[tdf["technology"].isin(input_tecs)] trade_dict[tec]["trade"]["historical_new_capacity"] = tdf return trade_dict
[docs] def bare_to_scenario( project_name: str | None = None, config_name: str | None = None, scenario_parameter_name: str = "scenario_parameters.pkl", p_drive_access: bool = False, ): """ Move data from bare files to a dictionary to update a MESSAGEix scenario Args: project_name: Name of the project (e.g., 'newpathways') config_name: Name of the config file (e.g., 'config.yaml') scenario_parameter_name: Name of the scenario parameter file Output: trade_dict: Dictionary compatible with updating a MESSAGEix scenario """ # Bring in configuration config, config_path, tec_config = load_config( project_name=project_name, config_name=config_name, load_tec_config=True ) covered_tec = config["covered_trade_technologies"] message_regions = config["scenario"]["regions"] # Get logger log = get_logger(__name__) # Read and inflate sheets based on model horizon trade_dict = build_parameter_sheets( log=log, project_name=project_name, config_name=config_name ) if p_drive_access: # Historical calibration for trade technology histdf = build_historical_activity( message_regions=message_regions, project_name=project_name, config_name=config_name, reimport_BACI=False, ) histdf.to_csv("check.csv") histdf = histdf[histdf["year_act"].isin([2000, 2005, 2010, 2015, 2020, 2023])] histdf["year_act"] = np.where( (histdf["year_act"] == 2023), 2025, # TODO: 2023 to 2025 only for now histdf["year_act"], ) histdf = histdf[histdf["value"] > 0] histdf["technology"] = histdf["technology"].str.replace("ethanol_", "eth_") histdf["technology"] = histdf["technology"].str.replace("fueloil_", "foil_") histnc = build_hist_new_capacity_trade( message_regions=message_regions, project_name=project_name, config_name=config_name, ) hist_tec = {} for tec in [ c for c in covered_tec if c not in ["crudeoil_piped", "foil_piped", "loil_piped"] ]: add_tec = tec_config[tec][tec + "_trade"]["trade_technology"] + "_exp" hist_tec[tec] = add_tec for tec in hist_tec.keys(): log.info("Add historical activity for " + tec) add_df = histdf[histdf["technology"].str.contains(hist_tec[tec])] trade_dict[tec]["trade"]["historical_activity"] = add_df log.info("Add historical new capacity for " + tec) add_df = histnc[histnc["technology"].str.contains(hist_tec[tec])] trade_dict[tec]["trade"]["historical_new_capacity"] = add_df trade_dict = calibrate_historical_shipping( config=config, trade_dict=trade_dict, covered_tec=covered_tec, project_name=project_name, config_name=config_name, ) # Ensure flow technologies are only added once covered_flow_tec: list[str] = [] for tec in covered_tec: if "input" in list(trade_dict[tec]["flow"].keys()): flow_tecs = list(trade_dict[tec]["flow"]["input"]["technology"].unique()) for par in trade_dict[tec]["flow"].keys(): trade_dict[tec]["flow"][par] = trade_dict[tec]["flow"][par][ ~trade_dict[tec]["flow"][par]["technology"].isin(covered_flow_tec) ] covered_flow_tec = covered_flow_tec + flow_tecs # Save trade_dictionary tdf = os.path.join(os.path.dirname(config_path), scenario_parameter_name) with open(tdf, "wb") as file_handler: pickle.dump(trade_dict, file_handler) return trade_dict