Source code for message_ix_models.tools.bilateralize.load_and_solve

# -*- coding: utf-8 -*-
"""
Update MESSAGEix scenario(s) with bilateralized dictionary

This script is the third step in implementing the bilateralize tool.
It updates a specified MESSAGEix scenario with the bilateralized dictionary.
It then has options to solve the scenario within the ixmp database
or save as a GDX data file for direct solve in GAMS.
"""

# Import packages
import logging
import os
from pathlib import Path

import ixmp
import message_ix
import pandas as pd

from message_ix_models.tools.bilateralize.utils import get_logger, load_config


# %% Remove existing trade technologies
[docs] def remove_trade_tech(scen: message_ix.Scenario, log, config_tec: dict, tec: str): """ Remove existing trade technologies """ base_tec_name = tec.replace("_shipped", "") base_tec_name = base_tec_name.replace("_piped", "") base_tec = [ config_tec[tec][tec + "_trade"]["trade_commodity"] + "_exp", config_tec[tec][tec + "_trade"]["trade_commodity"] + "_imp", base_tec_name + "_exp", base_tec_name + "_imp", ] base_tec = base_tec + [ i for i in scen.set("technology") if config_tec[tec][tec + "_trade"]["trade_commodity"] + "_exp_" in i ] if "crudeoil" in tec: base_tec = base_tec + ["oil_exp", "oil_imp"] # for crude base_tec = list(set(base_tec)) with scen.transact("Remove base trade technologies for " + tec): for t in base_tec: if t in list(scen.set("technology")): log.info("Removing base technology..." + t) scen.remove_set("technology", t)
# %% Add sets for trade technologies
[docs] def add_trade_sets(scen: message_ix.Scenario, log, trade_dict: dict, tec: str): """ Add sets for trade technologies """ new_sets = dict() for s in ["technology", "level", "commodity", "mode"]: setlist = set( list(trade_dict[tec]["trade"]["input"][s].unique()) + list(trade_dict[tec]["trade"]["output"][s].unique()) ) if "input" in trade_dict[tec]["flow"].keys(): add_list = set( list(trade_dict[tec]["flow"]["input"][s].unique()) + list(trade_dict[tec]["flow"]["output"][s].unique()) ) setlist = setlist.union(add_list) setlist_out = list(setlist) new_sets[s] = setlist_out with scen.transact("Add new sets for " + tec): for s in ["technology", "level", "commodity", "mode"]: base_set = list(scen.set(s)) for i in new_sets[s]: if i not in base_set: log.info("Adding set: " + s + "..." + i) scen.add_set(s, i) else: pass
# %% Add parameters for bilateralized trade
[docs] def add_trade_parameters(scen: message_ix.Scenario, log, trade_dict: dict, tec: str): """ Add parameters for bilateralized trade """ new_parameter_list = list( set( [ i for i in list(trade_dict[tec]["trade"].keys()) + list(trade_dict[tec]["flow"].keys()) if "relation_" not in i ] ) ) with scen.transact("Add new parameters for " + tec): for p in new_parameter_list: log.info("Adding parameter for " + tec + ": " + p) pardf = pd.DataFrame() if trade_dict[tec]["trade"].get(p) is not None: log.info("... parameter added for trade technology") pardf = pd.concat([pardf, trade_dict[tec]["trade"][p]]) if trade_dict[tec]["flow"].get(p) is not None: log.info("... parameter added for flow technology") pardf = pd.concat([pardf, trade_dict[tec]["flow"][p]]) pardf = pardf[pardf["value"].notnull()] scen.add_par(p, pardf)
# %% Update relation parameters
[docs] def update_relation_parameters( scen: message_ix.Scenario, log, trade_dict: dict, tec: str ): """ Update relation parameters """ rel_parameter_list = list( set( [ i for i in list(trade_dict[tec]["trade"].keys()) + list(trade_dict[tec]["flow"].keys()) if "relation_" in i ] ) ) if len(rel_parameter_list) > 0: for rel_par in ["relation_activity", "relation_upper", "relation_lower"]: rel_par_df = pd.DataFrame() rel_par_list = list( set( [ i for i in list(trade_dict[tec]["trade"].keys()) + list(trade_dict[tec]["flow"].keys()) if rel_par in i ] ) ) for r in rel_par_list: if r in list(trade_dict[tec]["trade"].keys()): rel_par_df = pd.concat([rel_par_df, trade_dict[tec]["trade"][r]]) if r in list(trade_dict[tec]["flow"].keys()): rel_par_df = pd.concat([rel_par_df, trade_dict[tec]["flow"][r]]) if rel_par == "relation_activity": with scen.transact("Adding new relation sets"): new_relations = list(rel_par_df["relation"].unique()) for nr in new_relations: if nr not in scen.set("relation").unique(): scen.add_set("relation", nr) if len(rel_par_df) > 0: with scen.transact("Add " + rel_par + " for " + tec): log.info("Adding " + rel_par + " for " + tec) rel_par_df = rel_par_df[rel_par_df["value"].notnull()] scen.add_par(rel_par, rel_par_df)
# %% Update bunker fuels
[docs] def update_bunker_fuels(scen: message_ix.Scenario, tec: str, log, config_tec: dict): """ Update bunker fuels """ bunker_tec = config_tec[tec][tec + "_trade"]["bunker_technology"] if bunker_tec is not None: for btec in bunker_tec.keys(): bunkerdf_in = scen.par("input", filters={"technology": bunker_tec[btec]}) bunkerdf_out = bunkerdf_in.copy() bunkerdf_out["level"] = "bunker" with scen.transact("Update bunker fuel for" + tec): log.info("Updating bunker level for " + tec) scen.remove_par("input", bunkerdf_in) scen.add_par("input", bunkerdf_out)
# %% Update additional parameters (separate from bilateralization)
[docs] def update_additional_parameters( scen: message_ix.Scenario, extra_parameter_updates: dict | None = None ): """ Update additional parameters (separate from bilateralization) """ if extra_parameter_updates is not None: for par in extra_parameter_updates.keys(): with scen.transact("Update additional parameter: " + par): new_df = extra_parameter_updates[par] base_df = scen.par(par) if "value" in new_df.columns: rem_df = new_df[[c for c in new_df.columns if c != "value"]] base_df = base_df.merge( rem_df, left_on=list(rem_df.columns), right_on=list(rem_df.columns), how="inner", ) add_df = base_df.copy().drop(["value"], axis=1) add_df = add_df.merge( new_df, left_on=list(rem_df.columns), right_on=list(rem_df.columns), how="left", ) if "multiplier" in new_df.columns: col_list = [c for c in new_df.columns if c != "multiplier"] base_df = base_df.merge( new_df, left_on=col_list, right_on=col_list, how="inner" ) add_df = base_df.copy() base_df = base_df.drop(["multiplier"], axis=1) add_df["value"] = add_df["value"] * add_df["multiplier"] add_df = add_df.drop(["multiplier"], axis=1) scen.remove_par(par, base_df) scen.add_par(par, add_df)
# %% Remove PAO constraints on MESSAGEix-GLOBIOM
[docs] def remove_pao_coal_constraint( scen: message_ix.Scenario, log, MESSAGEix_GLOBIOM: bool = True ): """ Remove PAO coal and gas constraints on MESSAGEix-GLOBIOM """ if MESSAGEix_GLOBIOM: with scen.transact("Remove PAO coal and gas constraints on primary energy"): for rel in ["domestic_coal", "domestic_gas"]: log.info("Removing constraints on PAO primary energy: " + rel) relact_df = scen.par("relation_activity", filters={"relation": rel}) relupp_df = scen.par("relation_upper", filters={"relation": rel}) scen.remove_par("relation_activity", relact_df) scen.remove_par("relation_upper", relupp_df)
# %% Write just the GDX files
[docs] def save_to_gdx(mp: ixmp.Platform, scenario, output_path: str): """ Save the scenario to a GDX file. Args: mp: ixmp platform scenario: Scenario name output_path: Path to save the GDX file """ from ixmp.backend import ItemType mp._backend.write_file( Path(output_path), ItemType.SET | ItemType.PAR, filters={"scenario": scenario}, )
# %% Solve or save scenario
[docs] def solve_or_save( mp: ixmp.Platform, scen: message_ix.Scenario, solve: bool = False, to_gdx: bool = False, gdx_location: str | None = None, ): """ Solve or save scenario. """ if to_gdx and not solve and gdx_location is not None: save_to_gdx( mp=mp, scenario=scen, output_path=os.path.join( gdx_location, f"MsgData_{scen.model}_{scen.scenario}.gdx" ), ) if solve: solver = "MESSAGE" scen.solve(solver, solve_options=dict(lpmethod=4)) print("Unlock run ID of the scenario")
# runid = scen.run_id() # mp._backend.jobj.unlockRunid(runid)
[docs] def load_and_clone( mp: ixmp.Platform, log: logging.Logger, config_base: dict, start_scen: str | None = None, start_model: str | None = None, target_scen: str | None = None, target_model: str | None = None, ) -> message_ix.Scenario: """Load and clone scenario. Args: mp: ixmp platform log: Logger project_name: Name of project config_name: Name of config file start_scen: Name of scenario to start from start_model: Name of model to start from target_scen: Name of scenario to target target_model: Name of model to target """ # Load config if start_model is None: start_model = config_base.get("scenario", {}).get("start_model") if start_scen is None: start_scen = config_base.get("scenario", {}).get("start_scen") base = message_ix.Scenario(mp, model=start_model, scenario=start_scen) log.info(f"Loaded scenario: {start_model}/{start_scen}") # Clone scenario if target_model is None: target_model = config_base.get("scenario", {}).get("target_model", []) if target_scen is None: target_scen = config_base.get("scenario", {}).get("target_scen") scen = base.clone(target_model, target_scen, keep_solution=False) scen.set_as_default() log.info("Scenario loaded and cloned.") return scen
# %% Clone and update scenario
[docs] def load_and_solve( trade_dict, solve=False, to_gdx=False, project_name: str | None = None, config_name: str | None = None, start_scen: str | None = None, start_model: str | None = None, target_scen: str | None = None, target_model: str | None = None, scenario: message_ix.Scenario | None = None, extra_parameter_updates: dict | None = None, gdx_location: str | None = None, MESSAGEix_GLOBIOM: bool = True, ): """ Clone and update scenario. Args: trade_dict: Dictionary of parameter dataframes Optional Args: solve: If True, solve scenario to_gdx: If True, save scenario to a GDX file project_name: Name of project (message_ix_models/project/[THIS]) config_name: Name of config file. If None, uses default config from data/bilateralize/config_default.yaml start_scen: Name of scenario to start from start_model: Name of model to start from start_model_name: Name of model to start from target_scen: Name of scenario to target target_model: Name of model to target target_model_name: Name of model to target scenario: Scenario to update (if None, will clone from project yaml) additional_parameter_updates: Dictionary of additional parameter updates gdx_location: Location to save GDX file remove_pao_coal_constraint: Remove PAO coal and gas constraints """ # Load config config_base, config_path, config_tec = load_config( project_name, config_name, load_tec_config=True ) log = get_logger(name="load_and_solve") log.info("Loading and solving scenario") # Load the scenario mp = ixmp.Platform() if scenario is None: scen = load_and_clone( mp=mp, log=log, config_base=config_base, start_scen=start_scen, start_model=start_model, target_scen=target_scen, target_model=target_model, ) else: log.info(f"Using existing scenario: {scenario.model}/{scenario.scenario}") scen = scenario # Add sets and parameters for each covered technology covered_tec = config_base.get("covered_trade_technologies") for tec in covered_tec: # Remove existing technologies related to trade remove_trade_tech(scen=scen, log=log, config_tec=config_tec, tec=tec) # Add to sets: technology, level, commodity, mode add_trade_sets(scen=scen, log=log, trade_dict=trade_dict, tec=tec) # Add parameters add_trade_parameters(scen=scen, log=log, trade_dict=trade_dict, tec=tec) # Relation activity, upper, and lower update_relation_parameters(scen=scen, log=log, trade_dict=trade_dict, tec=tec) # Update bunker fuels update_bunker_fuels(scen=scen, tec=tec, log=log, config_tec=config_tec) # Update additional parameters update_additional_parameters( scen=scen, extra_parameter_updates=extra_parameter_updates ) # Remove PAO coal and gas constraints on MESSAGEix-GLOBIOM remove_pao_coal_constraint(scen=scen, log=log, MESSAGEix_GLOBIOM=MESSAGEix_GLOBIOM) # Solve or save scenario solve_or_save( mp=mp, scen=scen, solve=solve, to_gdx=to_gdx, gdx_location=gdx_location, )