# -*- coding: utf-8 -*-
"""
Prepare edit files for bilateralize tool
This script is the first step in implementing the bilateralize tool.
It generates empty (or default valued) parameters that are required for
bilateralization, specified by commodity.
This step is optional in a workflow; users can move directly to the next
step (2_bare_to_scenario).
"""
# Import packages
import itertools
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict
import message_ix
import numpy as np
import pandas as pd
import yaml
from message_ix_models.tools.bilateralize.calculate_distance import calculate_distance
from message_ix_models.tools.bilateralize.historical_calibration import (
build_historical_price,
)
from message_ix_models.tools.bilateralize.mariteam_calibration import calibrate_mariteam
from message_ix_models.tools.bilateralize.pull_gem import import_gem
from message_ix_models.tools.bilateralize.utils import get_logger, load_config
from message_ix_models.util import package_data_path
# %% Generate folders if missing
[docs]
def folders_for_trade(covered_tec: list[str]):
"""
Generate folders for each trade technology
"""
for tec in covered_tec:
tecpath = os.path.join(Path(package_data_path("bilateralize")), tec)
if not os.path.isdir(tecpath):
os.makedirs(tecpath)
pathlist = [
os.path.join(tecpath, "edit_files"),
os.path.join(tecpath, "edit_files", "flow_technology"),
os.path.join(tecpath, "bare_files"),
os.path.join(tecpath, "bare_files", "flow_technology"),
]
for f in pathlist:
if not os.path.isdir(f):
os.makedirs(f)
[docs]
def define_networks(
log,
message_regions: str,
covered_tec: list[str],
config_dict: Dict[str, Dict[str, Any]],
data_path: Path,
):
"""
Define network dataframe
Args:
message_regions: Regional resolution
"""
# Generate full combination of nodes to build technology-specific network
node_path = package_data_path("node", message_regions + ".yaml")
with open(node_path, "r", encoding="utf-8") as file_handler:
node_set = yaml.safe_load(file_handler)
node_set = [r for r in node_set.keys() if r not in ["World", "GLB"]]
node_df = pd.DataFrame(itertools.product(node_set, node_set))
node_df.columns = ["exporter", "importer"]
node_df = node_df[node_df["exporter"] != node_df["importer"]]
network_setup = {}
for tec in covered_tec:
log.info(f"Defining network for {tec}")
node_df_tec = node_df.copy()
node_df_tec["export_technology"] = config_dict["trade_technology"][tec] + "_exp"
if config_dict["trade_tech_suffix"][tec] is not None:
node_df_tec["export_technology"] = (
node_df_tec["export_technology"]
+ "_"
+ config_dict["trade_tech_suffix"][tec]
)
node_df_tec["export_technology"] = (
node_df_tec["export_technology"]
+ "_"
+ node_df_tec["importer"].str.lower().str.split("_").str[1]
)
# If there are multiple trade "routes" (not flow, but decision) per technology
if (config_dict["trade_tech_number"][tec] is not None) & (
config_dict["trade_tech_number"][tec] != 1
):
ndt_out = pd.DataFrame()
for i in list(range(1, config_dict["trade_tech_number"][tec] + 1)):
ndt = node_df_tec.copy()
ndt["export_technology"] = ndt["export_technology"] + "_" + str(i)
ndt_out = pd.concat([ndt_out, ndt])
node_df_tec = ndt_out.copy()
node_df_tec["import_technology"] = config_dict["trade_technology"][tec] + "_imp"
node_df_tec["INCLUDE? (No=0, Yes=1)"] = ""
# Specify network (the function will stop to allow specification)
if config_dict["specify_network"][tec] is True:
try:
specify_network_tec = pd.read_csv(
os.path.join(data_path, tec, "specify_network_" + tec + ".csv")
)
except FileNotFoundError:
node_df_tec.to_csv(
os.path.join(data_path, tec, "specify_network_" + tec + ".csv"),
index=False,
)
raise Exception(
"The function stopped. Sheet specify_network_"
+ tec
+ ".csv has been generated. "
+ "Fill in the specific pairs first and run again."
)
if not any(specify_network_tec["INCLUDE? (No=0, Yes=1)"].notnull()):
raise Exception(
"The function stopped. Ensure that all values "
+ "under 'INCLUDE? (No=0, Yes=1)' are filled"
)
elif config_dict["specify_network"][tec] is False:
specify_network_tec = node_df_tec.copy()
specify_network_tec["INCLUDE? (No=0, Yes=1)"] = 1
else:
raise Exception("Please use True or False.")
network_setup[tec] = specify_network_tec[
specify_network_tec["INCLUDE? (No=0, Yes=1)"] == 1
]
return network_setup
# %% Build base parameter dataframe based on network dataframe
[docs]
def build_parameterdf(
par_name: str,
network_df: pd.DataFrame,
col_values: dict,
common_years: dict | None = None,
common_cols: dict | None = None,
export_only: bool = False,
):
"""
Build parameter dataframes based on the specified network dataframe.
Args:
par_name: Parameter name (e.g., capacity_factor)
network_df: Specified network dataframe
col_values: Values for other columns to populate as default
export_only: If True, only produces dataframe for export technology
"""
if common_years is None:
common_years = dict(
year_vtg="broadcast", year_rel="broadcast", year_act="broadcast"
)
if common_cols is None:
common_cols = dict(mode="M1", time="year", time_origin="year", time_dest="year")
df_export = message_ix.make_df(
par_name,
node_loc=network_df["exporter"],
technology=network_df["export_technology"],
**col_values,
**common_years,
**common_cols,
)
df = df_export.copy()
if not export_only:
df_import = message_ix.make_df(
par_name,
node_loc=network_df["importer"],
technology=network_df["import_technology"],
**col_values,
**common_years,
**common_cols,
)
df = pd.concat([df, df_import])
return df
# %% Generate input parameter (trade technology)
# %% Generate output parameter (trade technology)
[docs]
def build_output(
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
):
"""
Generate output parameter (trade technology)
"""
# Trade Level
df_output_trade = message_ix.make_df(
"output",
node_loc=network_setup[tec]["exporter"],
node_dest=network_setup[tec]["importer"],
technology=network_setup[tec]["export_technology"],
commodity=config_dict["trade_commodity"][tec],
level=config_dict["trade_level"][tec],
value=1,
unit=config_dict["trade_units"][tec],
**common_years,
**common_cols,
)
# Import Level
df_output_import = message_ix.make_df(
"output",
node_loc=network_setup[tec]["importer"],
node_dest=network_setup[tec]["importer"],
technology=network_setup[tec]["import_technology"],
commodity=config_dict["trade_commodity"][tec],
level=config_dict["import_level"][tec],
value=1,
unit=config_dict["trade_units"][tec],
**common_years,
**common_cols,
)
df_output = pd.concat([df_output_trade, df_output_import]).drop_duplicates()
parameter_outputs[tec]["trade"]["output"] = df_output
return parameter_outputs
# %% Generate technical lifetime parameter (trade technology)
[docs]
def build_technical_lifetime(
tec: str, network_setup: dict, parameter_outputs: dict, **kwargs
):
"""
Generate technical lifetime parameter (trade technology)
"""
df_teclt = build_parameterdf(
"technical_lifetime",
network_df=network_setup[tec],
col_values=dict(
value=1, # Make 1 year by default
unit="y",
),
)
parameter_outputs[tec]["trade"]["technical_lifetime"] = df_teclt.drop_duplicates()
return parameter_outputs
# %% Generate historical activity parameter (trade technology)
[docs]
def build_historical_activity(
tec: str, network_setup: dict, config_dict: dict, parameter_outputs: dict, **kwargs
):
"""
Generate costs for trade technology
"""
df_hist = pd.DataFrame()
for y in list(range(2000, 2025, 5)):
ydf = build_parameterdf(
"historical_activity",
network_df=network_setup[tec],
col_values=dict(unit=config_dict["trade_units"][tec]),
)
ydf["year_act"] = y
df_hist = pd.concat([df_hist, ydf])
parameter_outputs[tec]["trade"]["historical_activity"] = df_hist.drop_duplicates()
return parameter_outputs
# %% Generate costs for trade technology
[docs]
def build_costs(
tec: str, network_setup: dict, config_dict: dict, parameter_outputs: dict, **kwargs
):
"""
Generate costs for trade technology
"""
# Create base files: inv_cost, fix_cost, var_cost
for cost_par in ["inv_cost", "fix_cost", "var_cost"]:
df_cost = build_parameterdf(
cost_par,
network_df=network_setup[tec],
col_values=dict(unit="USD/" + config_dict["trade_units"][tec]),
)
parameter_outputs[tec]["trade"][cost_par] = df_cost.drop_duplicates()
return parameter_outputs
# %% Generate capacity factor parameter (trade technology)
[docs]
def build_capacity_factor(
tec: str, network_setup: dict, config_dict: dict, parameter_outputs: dict, **kwargs
):
"""
Generate capacity factor parameter (trade technology)
"""
df_cf = build_parameterdf(
"capacity_factor",
network_df=network_setup[tec],
col_values=dict(value=1, unit="%"),
)
parameter_outputs[tec]["trade"]["capacity_factor"] = df_cf.drop_duplicates()
return parameter_outputs
# %% Generate constraints for trade technology
[docs]
def build_constraints(
tec: str, network_setup: dict, config_dict: dict, parameter_outputs: dict, **kwargs
):
"""
Generate constraints for trade technology
"""
for par_name in [
"initial_activity",
"abs_cost_activity_soft",
"growth_activity",
"level_cost_activity_soft",
"soft_activity",
]:
for t in ["lo", "up"]:
df_con = build_parameterdf(
par_name + "_" + t,
network_df=network_setup[tec],
col_values=dict(unit=config_dict["trade_units"][tec]),
)
if par_name == "growth_activity":
if t == "lo":
df_con["value"] = -0.05
if t == "up":
df_con["value"] = 0.05
if par_name == "initial_activity":
if t == "lo":
df_con["value"] = 2
if t == "up":
df_con["value"] = 2
df_con = df_con.drop_duplicates()
parameter_outputs[tec]["trade"][par_name + "_" + t] = df_con
return parameter_outputs
[docs]
def build_domestic_coalgas_relation(
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
**kwargs,
):
"""
Generate domestic coal and gas relation parameter (trade technology)
"""
if tec in ["gas_piped"]:
for rel_act in ["domestic_coal", "domestic_gas"]:
df_rel = message_ix.make_df(
"relation_activity",
node_loc=network_setup[tec]["importer"],
node_rel=network_setup[tec]["importer"],
technology=network_setup[tec]["import_technology"],
commodity=config_dict["trade_commodity"][tec],
value=-1,
unit="???",
relation=rel_act,
**common_years,
**common_cols,
)
df_rel = df_rel.drop_duplicates()
parameter_outputs[tec]["trade"]["relation_activity_" + rel_act] = df_rel
return parameter_outputs
# %% Generate emission factor parameter (trade technology)
[docs]
def build_emission_factor(
tec: str, network_setup: dict, config_dict: dict, parameter_outputs: dict, **kwargs
):
"""
Generate emission factor parameter (trade technology)
"""
if config_dict["tracked_emissions"][tec] is not None:
df_ef = pd.DataFrame()
for emission_type in config_dict["tracked_emissions"][tec]:
df_ef_t = build_parameterdf(
"emission_factor",
network_df=network_setup[tec],
col_values=dict(unit=None, emission=emission_type),
)
df_ef = pd.concat([df_ef, df_ef_t])
parameter_outputs[tec]["trade"]["emission_factor"] = df_ef.drop_duplicates()
return parameter_outputs
# %% Generate CO2 relation accounting
[docs]
def build_accounting_relations(
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
**kwargs,
):
"""
Relations for accounting purposes:
CO2 emissions, primary energy total, aggregate exports,
regional exports, regional imports
"""
# CO2 emissions
df_rel = message_ix.make_df(
"relation_activity",
node_loc=network_setup[tec]["exporter"],
node_rel=network_setup[tec]["exporter"],
technology=network_setup[tec]["export_technology"],
commodity=config_dict["trade_commodity"][tec],
unit="???",
relation="CO2_Emission",
**common_years,
**common_cols,
)
df_rel = df_rel.drop_duplicates()
parameter_outputs[tec]["trade"]["relation_activity_CO2_Emission"] = df_rel
# Primary energy total
df_rel = message_ix.make_df(
"relation_activity",
node_loc=network_setup[tec]["exporter"],
node_rel=network_setup[tec]["exporter"],
technology=network_setup[tec]["export_technology"],
commodity=config_dict["trade_commodity"][tec],
value=-1,
unit="???",
relation="PE_total_traditional",
**common_years,
**common_cols,
)
df_rel = df_rel.drop_duplicates()
parameter_outputs[tec]["trade"]["relation_activity_PE_total_traditional"] = df_rel
# Aggregate exports
df_rel = message_ix.make_df(
"relation_activity",
node_loc=network_setup[tec]["exporter"],
node_rel=network_setup[tec]["exporter"],
technology=network_setup[tec]["export_technology"],
commodity=config_dict["trade_commodity"][tec],
value=1,
unit="???",
relation=config_dict["trade_technology"][tec] + "_exp_global",
**common_years,
**common_cols,
)
df_rel = df_rel.drop_duplicates()
parameter_outputs[tec]["trade"]["relation_activity_global_aggregate"] = df_rel
# Regional exports
rel_use = config_dict["trade_technology"][tec] + "_exp"
df_rel = message_ix.make_df(
"relation_activity",
node_loc=network_setup[tec]["exporter"],
node_rel=network_setup[tec]["exporter"],
technology=network_setup[tec]["export_technology"],
commodity=config_dict["trade_commodity"][tec],
value=1,
unit="???",
relation=rel_use,
**common_years,
**common_cols,
)
df_rel = df_rel.drop_duplicates()
parameter_outputs[tec]["trade"]["relation_activity_regional_exp"] = df_rel
# Regional imports
rel_use = config_dict["trade_technology"][tec] + "_imp"
df_rel = message_ix.make_df(
"relation_activity",
node_loc=network_setup[tec]["importer"],
node_rel=network_setup[tec]["importer"],
technology=network_setup[tec]["import_technology"],
commodity=config_dict["trade_commodity"][tec],
value=1,
unit="???",
relation=rel_use,
**common_years,
**common_cols,
)
df_rel = df_rel.drop_duplicates()
parameter_outputs[tec]["trade"]["relation_activity_regional_imp"] = df_rel
return parameter_outputs
# %% Generate flow technology input parameter
# %% Generate flow technology output parameter
[docs]
def build_flow_output(
flow_tec: str,
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
message_regions: str = "R12",
**kwargs,
):
"""
Generate flow technology output parameter
"""
df_output = pd.DataFrame()
# If pipelines technologies and outputs are bilateral
if config_dict["flow_constraint"][tec] == "bilateral":
df_output_base = message_ix.make_df(
"output",
node_loc=network_setup[tec]["exporter"],
node_dest=network_setup[tec]["exporter"],
technology=flow_tec
+ "_"
+ network_setup[tec]["importer"].str.lower().str.split("_").str[1],
commodity=config_dict["flow_commodity_output"][tec]
+ "_"
+ network_setup[tec]["importer"].str.lower().str.split("_").str[1],
unit=config_dict["flow_units"][tec],
level=config_dict["trade_level"][tec],
**common_years,
**common_cols,
)
df_output = pd.concat([df_output, df_output_base])
# If shipping, technologies and outputs are global with diff modes
elif config_dict["flow_constraint"][tec] == "global":
df_output_base = message_ix.make_df(
"output",
node_loc=network_setup[tec]["exporter"],
node_dest=message_regions + "_GLB",
mode="M1",
technology=flow_tec,
commodity=config_dict["flow_commodity_output"][tec],
unit=config_dict["flow_units"][tec],
level=config_dict["trade_level"][tec],
time="year",
time_dest="year",
**common_years,
)
df_output = pd.concat([df_output, df_output_base])
# For shipped trade, set up bunker fuels
if config_dict["bunker_technology"][tec] is not None:
# Global bunker
df_output_gbunk = df_output.copy()
df_output_gbunk["technology"] = (
"bunker_global_" + config_dict["flow_fuel_input"][tec][flow_tec][0]
)
df_output_gbunk["node_dest"] = message_regions + "_GLB"
df_output_gbunk["level"] = "bunker"
df_output_gbunk["unit"] = "GWa"
df_output_gbunk["mode"] = "M1"
df_output_gbunk["commodity"] = config_dict["flow_fuel_input"][tec][flow_tec][0]
df_output = pd.concat([df_output, df_output_gbunk])
df_output = df_output.drop_duplicates()
df_output["value"] = 1
output_out = pd.concat([parameter_outputs[tec]["flow"]["output"], df_output])
parameter_outputs[tec]["flow"]["output"] = output_out
return parameter_outputs
# %% Generate flow technology capacity constraints parameter
[docs]
def build_flow_capacity_constraints(
par: str,
flow_tec: str,
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
**kwargs,
):
"""
Generate flow technology capacity constraints parameter
"""
for t in ["lo", "up"]:
if config_dict["flow_constraint"][tec] == "bilateral":
df_con = message_ix.make_df(
par + "_" + t,
node_loc=network_setup[tec]["exporter"],
technology=flow_tec
+ "_"
+ network_setup[tec]["importer"].str.lower().str.split("_").str[1],
unit=config_dict["flow_units"][tec],
**common_years,
**common_cols,
)
elif config_dict["flow_constraint"][tec] == "global":
df_con = message_ix.make_df(
par + "_" + t,
node_loc=network_setup[tec]["exporter"],
technology=flow_tec,
unit=config_dict["flow_units"][tec],
**common_years,
**common_cols,
)
con_out = pd.concat([parameter_outputs[tec]["flow"][par + "_" + t], df_con])
parameter_outputs[tec]["flow"][par + "_" + t] = con_out
return parameter_outputs
# %% Generate flow technology costs parameter
[docs]
def build_flow_FIcosts(
flow_tec: str,
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
**kwargs,
):
"""
Generate flow technology costs parameter
"""
for cost_par in ["fix_cost", "inv_cost"]:
if config_dict["flow_constraint"][tec] == "bilateral":
tec_use = (
flow_tec
+ "_"
+ network_setup[tec]["importer"].str.lower().str.split("_").str[1]
)
df_cost = message_ix.make_df(
cost_par,
node_loc=network_setup[tec]["exporter"],
technology=tec_use,
unit="USD/" + config_dict["flow_units"][tec],
**common_years,
**common_cols,
)
elif config_dict["flow_constraint"][tec] == "global":
df_cost = message_ix.make_df(
cost_par,
node_loc=network_setup[tec]["exporter"],
technology=flow_tec,
unit="USD/" + config_dict["flow_units"][tec],
**common_years,
**common_cols,
)
df_cost = df_cost.drop_duplicates()
if "shipped" in tec:
df_cost["value"] = 0.04 # Default is 0.04 USD/GWa
cost_out = pd.concat([parameter_outputs[tec]["flow"][cost_par], df_cost])
parameter_outputs[tec]["flow"][cost_par] = cost_out
return parameter_outputs
[docs]
def build_flow_Vcosts(
flow_tec: str,
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
message_regions: str = "R12",
**kwargs,
):
"""
Generate flow technology variable costs parameter
"""
if config_dict["flow_constraint"][tec] == "bilateral":
tec_use = (
flow_tec
+ "_"
+ network_setup[tec]["importer"].str.lower().str.split("_").str[1]
)
df_vcost_base = message_ix.make_df(
"var_cost",
node_loc=network_setup[tec]["exporter"],
technology=tec_use,
unit="USD/" + config_dict["flow_units"][tec],
**common_years,
**common_cols,
)
elif config_dict["flow_constraint"][tec] == "global":
exp0 = network_setup[tec]["exporter"].str.replace(message_regions + "_", "")
imp0 = network_setup[tec]["importer"].str.replace(message_regions + "_", "")
mode_use = exp0 + "-" + imp0
df_vcost_base = message_ix.make_df(
"var_cost",
node_loc=network_setup[tec]["exporter"],
mode=mode_use,
technology=flow_tec,
unit="USD/" + config_dict["flow_units"][tec],
time="year",
**common_years,
)
if "shipped" in tec:
df_vcost_base["value"] = 0.002 # Default is 0.002 USD/Mt-km
vcost_out = pd.concat([parameter_outputs[tec]["flow"]["var_cost"], df_vcost_base])
parameter_outputs[tec]["flow"]["var_cost"] = vcost_out
return parameter_outputs
# %% Generate flow technology capacity factor parameter
[docs]
def build_flow_capacity_factor(
flow_tec: str,
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
**kwargs,
):
"""
Generate flow technology capacity factor parameter
"""
df_cf_base = build_parameterdf(
"capacity_factor",
network_df=network_setup[tec],
col_values=dict(value=1, unit="%"),
export_only=True,
)
if config_dict["flow_constraint"][tec] == "bilateral":
df_cf_base["technology"] = (
flow_tec + "_" + df_cf_base["technology"].str.lower().str.split("_").str[-1]
)
elif config_dict["flow_constraint"][tec] == "global":
df_cf_base["technology"] = flow_tec
df_cf_base = df_cf_base.drop_duplicates()
if config_dict["bunker_technology"][tec] is not None:
# Add global bunker fuel technology
bdf = df_cf_base.copy()
bdf["technology"] = (
"bunker_global_" + config_dict["flow_fuel_input"][tec][flow_tec][0]
)
df_cf_base = pd.concat([df_cf_base, bdf])
cf_out = pd.concat([parameter_outputs[tec]["flow"]["capacity_factor"], df_cf_base])
parameter_outputs[tec]["flow"]["capacity_factor"] = cf_out
return parameter_outputs
# %% Generate flow technology technical lifetime parameter
[docs]
def build_flow_technical_lifetime(
flow_tec: str,
tec: str,
network_setup: dict,
config_dict: dict,
common_years: dict,
common_cols: dict,
parameter_outputs: dict,
**kwargs,
):
"""
Generate flow technology technical lifetime parameter
"""
df_teclt_base = build_parameterdf(
"technical_lifetime",
network_df=network_setup[tec],
col_values=dict(
value=20, # Default is 20y
unit="y",
),
export_only=True,
)
if config_dict["flow_constraint"][tec] == "bilateral":
df_teclt_base["technology"] = (
flow_tec
+ "_"
+ df_teclt_base["technology"].str.lower().str.split("_").str[-1]
)
elif config_dict["flow_constraint"][tec] == "global":
df_teclt_base["technology"] = flow_tec
df_teclt_base = df_teclt_base.drop_duplicates()
if config_dict["bunker_technology"][tec] is not None:
# Add global bunker fuel technology
bdf = df_teclt_base.copy()
bdf["technology"] = (
"bunker_global_" + config_dict["flow_fuel_input"][tec][flow_tec][0]
)
df_teclt_base = pd.concat([df_teclt_base, bdf])
teclt_out = pd.concat(
[parameter_outputs[tec]["flow"]["technical_lifetime"], df_teclt_base]
)
parameter_outputs[tec]["flow"]["technical_lifetime"] = teclt_out
return parameter_outputs
# %% Flow as trade input
# %% Export edit files
[docs]
def export_edit_files(
covered_tec: list[str],
log: logging.Logger,
data_path: Path,
parameter_outputs: dict,
):
"""
Export edit files
"""
for tec in covered_tec:
log.info(f"Exporting trade parameters for {tec}")
for parname in parameter_outputs[tec]["trade"].keys():
parameter_outputs[tec]["trade"][parname].to_csv(
os.path.join(data_path, tec, "edit_files", parname + ".csv"),
index=False,
)
log.info(f"...trade {parname}")
for parname in parameter_outputs[tec]["flow"].keys():
parameter_outputs[tec]["flow"][parname].to_csv(
os.path.join(
data_path, tec, "edit_files", "flow_technology", parname + ".csv"
),
index=False,
)
log.info(f"...flow {parname}")
## Transfer files from edit to bare if they do not already exist
for tec in covered_tec:
reqpar_list = [
os.path.join("capacity_factor.csv"),
os.path.join("input.csv"),
os.path.join("output.csv"),
os.path.join("technical_lifetime.csv"),
os.path.join(
"relation_activity_regional_exp.csv"
), # necessary for legacy reporting
os.path.join("flow_technology", "capacity_factor.csv"),
os.path.join("flow_technology", "input.csv"),
os.path.join("flow_technology", "output.csv"),
os.path.join("flow_technology", "technical_lifetime.csv"),
]
for reqpar in reqpar_list:
if not os.path.isfile(os.path.join(data_path, tec, "bare_files", reqpar)):
base_file = os.path.join(data_path, tec, "edit_files", reqpar)
dest_file = os.path.join(data_path, tec, "bare_files", reqpar)
shutil.copy2(base_file, dest_file)
log.info(f"Copied file from edit to bare: {reqpar}")
## Transfer cost parameters for flow technologies using shipping from edit to bare
for tec in covered_tec:
if "shipped" in tec:
required_parameters = [
os.path.join("flow_technology", "var_cost.csv"),
os.path.join("flow_technology", "inv_cost.csv"),
]
for reqpar in required_parameters:
if not os.path.isfile(
os.path.join(data_path, tec, "bare_files", reqpar)
):
base_file = os.path.join(data_path, tec, "edit_files", reqpar)
dest_file = os.path.join(data_path, tec, "bare_files", reqpar)
shutil.copy2(base_file, dest_file)
log.info(f"Copied file from edit to bare: {reqpar}")
# %% Main function to generate bare sheets
[docs]
def generate_edit_files(
log,
project_name: str | None = None,
config_name: str | None = None,
message_regions: str = "R12",
):
"""
Generate bare sheets to collect required parameters
Args:
log (log, required): Log file to track progress
project_name (str, optional): Project name (message_ix_models/project/[THIS])
config_name (str, optional): Name of the config file.
If None, uses default config from data/bilateralize/config_default.yaml
message_regions (str, optional): Default is R12 regionality
"""
data_path = package_data_path("bilateralize")
data_path = Path(os.path.join(os.path.dirname(data_path), "bilateralize"))
# Load config
log.info("Loading config file")
config_base, config_path, config_tec = load_config(
project_name, config_name, load_tec_config=True
)
# Retrieve config sections
message_regions = config_base.get("scenario", {}).get("regions")
covered_tec = config_base.get("covered_trade_technologies", {})
config_dict: Dict[str, Dict[str, Any]] = {}
for tec in covered_tec:
tec_dict = config_tec.get(tec).get(tec + "_trade", {})
for k in tec_dict.keys():
if k not in config_dict.keys():
config_dict[k] = {}
config_dict[k][tec] = tec_dict[k]
# Create folders for trade technologies (if missing)
folders_for_trade(covered_tec)
# Generate full combination of nodes to build technology-specific network
network_setup = define_networks(
log, message_regions, covered_tec, config_dict, data_path
)
# Common values across parameters
common_years = dict(
year_vtg="broadcast", year_act="broadcast", year_rel="broadcast"
)
common_cols = dict(mode="M1", time="year", time_origin="year", time_dest="year")
parameter_outputs: Dict[str, Dict[str, Any]] = {} # This will get populated
for tec in covered_tec:
parameter_outputs[tec] = dict(trade=dict(), flow=dict())
# Populate parameter_outputs dictionaries
for tec in covered_tec:
tec_args = {
"tec": tec,
"network_setup": network_setup,
"config_dict": config_dict,
"common_years": common_years,
"common_cols": common_cols,
"parameter_outputs": parameter_outputs,
}
# Trade input
parameter_outputs = build_input(**tec_args)
# Trade output
parameter_outputs = build_output(**tec_args)
# Trade technical lifetime
parameter_outputs = build_technical_lifetime(**tec_args)
# Trade historical activity
parameter_outputs = build_historical_activity(**tec_args)
# Trade costs
parameter_outputs = build_costs(**tec_args)
# Trade capacity factor
parameter_outputs = build_capacity_factor(**tec_args)
# Trade constraints
parameter_outputs = build_constraints(**tec_args)
# Trade emission factor
parameter_outputs = build_emission_factor(**tec_args)
# For gas- imports require relation to domestic_coal and domestic_gas
parameter_outputs = build_domestic_coalgas_relation(**tec_args)
# Accounting relations
parameter_outputs = build_accounting_relations(**tec_args)
## FLOW TECHNOLOGY ##
#####################
# Create file: Input
for tec in covered_tec:
parameter_outputs[tec]["flow"]["input"] = pd.DataFrame()
parameter_outputs[tec]["flow"]["output"] = pd.DataFrame()
for par in ["growth_new_capacity", "initial_new_capacity"]:
parameter_outputs[tec]["flow"][par + "_up"] = pd.DataFrame()
parameter_outputs[tec]["flow"][par + "_lo"] = pd.DataFrame()
parameter_outputs[tec]["flow"]["fix_cost"] = pd.DataFrame()
parameter_outputs[tec]["flow"]["inv_cost"] = pd.DataFrame()
parameter_outputs[tec]["flow"]["var_cost"] = pd.DataFrame()
parameter_outputs[tec]["flow"]["capacity_factor"] = pd.DataFrame()
parameter_outputs[tec]["flow"]["technical_lifetime"] = pd.DataFrame()
for flow_tec in config_dict["flow_technologies"][tec]:
flow_args = {
"flow_tec": flow_tec,
"tec": tec,
"network_setup": network_setup,
"config_dict": config_dict,
"common_years": common_years,
"common_cols": common_cols,
"parameter_outputs": parameter_outputs,
"message_regions": message_regions,
"data_path": data_path,
}
# Flow input
parameter_outputs = build_flow_input(**flow_args)
# Flow output
parameter_outputs = build_flow_output(**flow_args)
# Flow capacity constraints
for par in ["growth_new_capacity", "initial_new_capacity"]:
parameter_outputs = build_flow_capacity_constraints(
par=par, **flow_args
)
# Flow costs
parameter_outputs = build_flow_FIcosts(**flow_args)
parameter_outputs = build_flow_Vcosts(**flow_args)
# Flow capacity factor
parameter_outputs = build_flow_capacity_factor(**flow_args)
# Flow technical lifetime
parameter_outputs = build_flow_technical_lifetime(**flow_args)
# Add flow technology outputs as an input into the trade technology
parameter_outputs = flow_as_trade_input(**flow_args)
## Export files
export_edit_files(covered_tec, log, data_path, parameter_outputs)
[docs]
def prepare_edit_files(
project_name: str | None = None,
config_name: str | None = None,
P_access: bool = True,
):
"""
Prepare edit files for bilateralize tool
Args:
project_name: Name of the project (e.g., 'newpathways')
config_name: Name of the config file (e.g., 'config.yaml')
P_access: User has access to P: drive
"""
# Get logger
log = get_logger(__name__)
# Bring in configuration
config, config_path = load_config(
project_name=project_name, config_name=config_name
)
covered_tec = config["covered_trade_technologies"]
message_regions = config["scenario"]["regions"]
data_path = package_data_path("bilateralize")
data_path = Path(os.path.join(os.path.dirname(data_path), "bilateralize"))
# Calculate distances
calculate_distance(message_regions)
# Generate bare sheets
generate_edit_files(
log=log,
project_name=project_name,
config_name=config_name,
message_regions=message_regions,
)
# Import calibration files from Global Energy Monitor
if P_access is True:
import_gem(
input_file="GEM-GGIT-Gas-Pipelines-2024-12.csv",
# input_sheet="Gas Pipelines 2024-12-17",
trade_technology="gas_piped",
flow_technology="gas_pipe",
flow_commodity="gas_pipeline_capacity",
project_name=project_name,
config_name=config_name,
)
for tradetec in ["crudeoil_piped", "foil_piped", "loil_piped"]:
import_gem(
input_file="GEM-GOIT-Oil-NGL-Pipelines-2025-03.csv",
# input_sheet="Pipelines",
trade_technology=tradetec,
flow_technology="oil_pipe",
flow_commodity="oil_pipeline_capacity",
project_name=project_name,
config_name=config_name,
)
# Add MariTEAM calibration for maritime shipping
calibrate_mariteam(
covered_tec,
message_regions,
project_name=project_name,
config_name=config_name,
)
# Add variable costs for shipped commodities
costdf = build_historical_price(
message_regions, project_name=project_name, config_name=config_name
)
costdf["technology"] = costdf["technology"].str.replace("ethanol_", "eth_")
costdf["technology"] = costdf["technology"].str.replace("fueloil_", "foil_")
for tec in [i for i in covered_tec if i != "gas_piped"]:
log.info("Add fix cost for " + tec)
if "piped" in tec:
tec_shipped = tec.replace("piped", "shipped")
add_df = costdf[costdf["technology"].str.contains(tec_shipped)].copy()
add_df["technology"] = add_df["technology"].str.replace(
"shipped", "piped"
)
else:
add_df = costdf[costdf["technology"].str.contains(tec)]
col_list = add_df.columns
mean_cost = add_df["value"].mean()
input_df = pd.read_csv(
os.path.join(data_path, tec, "edit_files", "input.csv")
)
input_df = input_df[
["node_loc", "technology", "year_act", "year_vtg"] # "mode", "time"
].drop_duplicates()
add_df = input_df.merge(
add_df,
left_on=[
"node_loc",
"technology",
"year_act",
"year_vtg",
],
right_on=[
"node_loc",
"technology",
"year_act",
"year_vtg",
],
how="left",
)
add_df["value"] = np.where(
add_df["value"].isnull(), mean_cost, add_df["value"]
)
add_df["value"] = round(add_df["value"] / 5, 0)
add_df["unit"] = "USD/GWa"
add_df = add_df[col_list]
add_df = add_df[~add_df["technology"].str.contains("_imp")]
add_df = add_df[
["node_loc", "technology", "year_act", "year_vtg", "value", "unit"]
]
add_df = add_df.drop_duplicates()
add_df.to_csv(
os.path.join(data_path, tec, "edit_files", "fix_cost.csv"), index=False
)
add_df.to_csv(
os.path.join(data_path, tec, "bare_files", "fix_cost.csv"), index=False
)