import logging
from collections.abc import Mapping
from functools import lru_cache, partial
import pandas as pd
from sdmx.model.v21 import Code
from message_ix_models import Context, ScenarioInfo
from message_ix_models.model import build
from message_ix_models.model.structure import get_codes
from message_ix_models.util import broadcast, package_data_path
from .utils import read_config
log = logging.getLogger(__name__)
[docs]
def cat_tec_cooling(context: Context) -> tuple[pd.DataFrame, list[str]]:
"""
Categorize cooling technologies based on predefined types and match them with
parent technologies present in the scenario.
This function extracts cooling technology data from a CSV file, filters them
based on parent technologies available in the scenario, and categorizes each
cooling technology into a predefined type. It also retrieves a list of unique
region nodes from the scenario parameter data.
Parameters
----------
context : Context
Provides access to the current scenario and configuration.
Returns
-------
tuple[pd.DataFrame, list[str]]
- cat_tec: A DataFrame with columns:
- 'type_tec': Cooling technology category.
- 'tec': Name of the cooling technology.
- regions_df: A list of unique region nodes from the scenario.
"""
# Define cooling type categories and their corresponding strings
cooling_types = {
"share_cooling_ot_fresh_tot": ["ot_fresh", "cl_fresh", "air", "ot_saline"],
"share_cooling_cl_fresh_tot": ["ot_fresh", "cl_fresh", "air", "ot_saline"],
"share_cooling_air_tot": ["ot_fresh", "cl_fresh", "air", "ot_saline"],
"share_cooling_ot_saline_tot": ["ot_fresh", "cl_fresh", "air", "ot_saline"],
"share_cooling_ot_fresh_share": ["ot_fresh"],
"share_cooling_cl_fresh_share": ["cl_fresh"],
"share_cooling_air_share": ["air"],
"share_cooling_ot_saline_share": ["ot_saline"],
}
FILE = "tech_water_performance_ssp_msg.csv"
path = package_data_path("water", "ppl_cooling_tech", FILE)
df = pd.read_csv(path)
cooling_df = df.loc[df["technology_group"] == "cooling"].copy(deep=True)
# Separate a column for parent technologies of respective cooling
# techs
cooling_df["parent_tech"] = (
cooling_df["technology_name"]
.apply(lambda x: pd.Series(str(x).split("__")))
.drop(columns=1)
)
# Extract unique technologies
sc = context.get_scenario()
# get df = sc.par("input") for technollgies in cooling_df(parent_tach)
df = sc.par("input", filters={"technology": cooling_df["parent_tech"].unique()})
missing_tec = cooling_df["parent_tech"][
~cooling_df["parent_tech"].isin(df["technology"])
]
# some techs only have output, like csp
ref_output = sc.par("output", {"technology": missing_tec})
ref_output.columns = df.columns
# merge ref_input and ref_output
df = pd.concat([df, ref_output])
parent_tech_sc = df["technology"].unique()
regions_df = df["node_loc"].unique().tolist()
# Assertion check for valid data
assert (
len(parent_tech_sc) > 0
), "No matching parent technologies found in the scenario."
assert len(regions_df) > 0, "No unique nodes (regions) found in the scenario."
# not filter cooling_tec with only parent_tech matching parent_tech_sc
cooling_df = cooling_df.loc[cooling_df["parent_tech"].isin(parent_tech_sc)].copy()
unique_technologies = cooling_df["technology_name"].unique()
# Create a list to store rows for the cat_tec DataFrame
cat_tec_rows = []
# Iterate through unique technologies
for tech in unique_technologies:
for type_tec, keywords in cooling_types.items():
for keyword in keywords:
if keyword in tech:
# Add a row to the cat_tec list with type_tec and technology
cat_tec_rows.append({"type_tec": type_tec, "tec": tech})
# Create the cat_tec DataFrame
cat_tec = pd.DataFrame(cat_tec_rows)
return cat_tec, regions_df
[docs]
def get_spec(context: Context) -> Mapping[str, ScenarioInfo]:
"""Return the specification for nexus implementation
Parameters
----------
context : .Context
The key ``regions`` determines the regional aggregation used.
"""
context = read_config()
require = ScenarioInfo()
remove = ScenarioInfo()
add = ScenarioInfo()
# cooling data included by default
# Merge technology.yaml with set.yaml
context["water set"]["cooling"]["technology"]["add"] = context["water technology"][
"cooling"
]
# Update the ScenarioInfo objects with required and new set elements
for set_name, config in context["water set"]["cooling"].items():
# Required elements
require.set[set_name].extend(config.get("require", []))
# Elements to remove
remove.set[set_name].extend(config.get("remove", []))
# Elements to add
add.set[set_name].extend(config.get("add", []))
if context.nexus_set == "nexus":
# Merge technology.yaml with set.yaml
context["water set"]["nexus"]["technology"]["add"] = context[
"water technology"
]["nexus"]
# Update the ScenarioInfo objects with required and new set elements
for set_name, config in context["water set"]["nexus"].items():
# Required elements
require.set[set_name].extend(config.get("require", []))
# Elements to remove
remove.set[set_name].extend(config.get("remove", []))
# Elements to add
add.set[set_name].extend(config.get("add", []))
# The set of required nodes varies according to context.regions
n_codes = get_codes(f"node/{context.regions}")
nodes = list(map(str, n_codes[n_codes.index(Code(id="World"))].child))
require.set["node"].extend(nodes)
# Share commodity for groundwater
results = {}
df_node = context.all_nodes
n = len(df_node.values)
d = {
"shares": ["share_low_lim_GWat"] * n,
"node_share": df_node,
"node": df_node,
"type_tec": ["share_low_lim_GWat_share"] * n,
"mode": ["M1"] * n,
"commodity": ["groundwater_basin"] * n,
"level": ["water_avail_basin"] * n,
}
df_share = pd.DataFrame(data=d)
df_list = df_share.values.tolist()
results["map_shares_commodity_share"] = df_list
d = {
"shares": ["share_low_lim_GWat"] * n,
"node_share": df_node,
"node": df_node,
"type_tec": ["share_low_lim_GWat_total"] * n,
"mode": ["M1"] * n,
"commodity": ["surfacewater_basin"] * n,
"level": ["water_avail_basin"] * n,
}
df_share = pd.DataFrame(data=d)
d2 = {
"shares": ["share_low_lim_GWat"] * n,
"node_share": df_node,
"node": df_node,
"type_tec": ["share_low_lim_GWat_total"] * n,
"mode": ["M1"] * n,
"commodity": ["groundwater_basin"] * n,
"level": ["water_avail_basin"] * n,
}
df_share2 = pd.DataFrame(data=d2)
df_share = pd.concat([df_share, df_share2])
df_list = df_share.values.tolist()
results["map_shares_commodity_total"] = df_list
for set_name, config in results.items():
# Sets to add
add.set[set_name].extend(config)
results = {}
# Share commodity for urban water recycling
d = {
"shares": ["share_wat_recycle"] * n,
"node_share": df_node,
"node": df_node,
"type_tec": ["share_wat_recycle_share"] * n,
"mode": ["M1"] * n,
"commodity": ["urban_collected_wst"] * n,
"level": ["water_treat"] * n,
}
df_share = pd.DataFrame(data=d)
df_list = df_share.values.tolist()
results["map_shares_commodity_share"] = df_list
d = {
"shares": ["share_wat_recycle"] * n,
"node_share": df_node,
"node": df_node,
"type_tec": ["share_wat_recycle_total"] * n,
"mode": ["M1"] * n,
"commodity": ["urban_collected_wst"] * n,
"level": ["water_treat"] * n,
}
df_share = pd.DataFrame(data=d)
d2 = {
"shares": ["share_wat_recycle"] * n,
"node_share": df_node,
"node": df_node,
"type_tec": ["share_wat_recycle_total"] * n,
"mode": ["M1"] * n,
"commodity": ["urban_collected_wst"] * n,
"level": ["water_treat"] * n,
}
df_share2 = pd.DataFrame(data=d2)
df_share = pd.concat([df_share, df_share2])
df_list = df_share.values.tolist()
results["map_shares_commodity_total"] = df_list
for set_name, config in results.items():
# Sets to add
add.set[set_name].extend(config)
# for both cooling and nexus add share contraints for cooling technologies
# cat_tec
results = {}
cat_tec, nodes_cooling = cat_tec_cooling(context)
results["cat_tec"] = cat_tec.values.tolist()
n = len(nodes_cooling)
# Share commodity for urban water recycling
shares_cool = [
"share_cooling_ot_fresh",
"share_cooling_cl_fresh",
"share_cooling_air",
"share_cooling_ot_saline",
]
commodity_cool = ["ot_fresh", "cl_fresh", "air", "ot_saline"]
type_tec_share = [
"share_cooling_ot_fresh_share",
"share_cooling_cl_fresh_share",
"share_cooling_air_share",
"share_cooling_ot_saline_share",
]
df_share = pd.DataFrame(
{
"shares": shares_cool,
"node_share": [None] * len(shares_cool), # Placeholder for node_share
"node": [None] * len(shares_cool), # Placeholder for node
"type_tec": type_tec_share,
"mode": "M1", # Repeat mode
"commodity": commodity_cool,
"level": "share", # Repeat level
}
).pipe(broadcast, node_share=nodes_cooling)
df_share["node"] = df_share["node_share"]
# re order columns like this:
# ['shares', 'node_share', 'node', 'type_tec', 'mode', 'commodity', 'level']
df_share = df_share[
["shares", "node_share", "node", "type_tec", "mode", "commodity", "level"]
]
df_list = df_share.values.tolist()
results["map_shares_commodity_share"] = df_list
# for total
type_tec_tot = [
"share_cooling_ot_fresh_tot",
"share_cooling_cl_fresh_tot",
"share_cooling_air_tot",
"share_cooling_ot_saline_tot",
]
df_share = pd.DataFrame(
{
"shares": shares_cool,
"node_share": [None] * len(shares_cool), # Placeholder for node_share
"node": [None] * len(shares_cool), # Placeholder for node
"type_tec": type_tec_tot,
"mode": "M1", # Repeat mode
"commodity": [None] * len(shares_cool),
"level": "share", # Repeat level
}
).pipe(broadcast, node_share=nodes_cooling, commodity=commodity_cool)
df_share["node"] = df_share["node_share"]
# re order columns like this:
# ['shares', 'node_share', 'node', 'type_tec', 'mode', 'commodity', 'level']
df_share = df_share[
["shares", "node_share", "node", "type_tec", "mode", "commodity", "level"]
]
df_list = df_share.values.tolist()
results["map_shares_commodity_total"] = df_list
for set_name, config in results.items():
# Sets to add
add.set[set_name].extend(config)
return dict(require=require, remove=remove, add=add)
@lru_cache()
def generate_set_elements(set_name, match=None):
codes = read_config()["water set"][set_name].get("add", [])
hierarchical = set_name in {"technology"}
results = []
for code in codes:
if match and code.id != match:
continue
elif hierarchical:
results.extend(code)
return results
[docs]
def map_basin(context: Context) -> Mapping[str, ScenarioInfo]:
"""Return specification for mapping basins to regions
The basins are spatially consolidated from HydroSHEDS basins delineation
database.This delineation is then intersected with MESSAGE regions to form new
water sector regions for the nexus module.
The nomenclature for basin names is <basin_id>|<MESSAGEregion> such as R1|AFR
"""
context = read_config()
add = ScenarioInfo()
require = ScenarioInfo()
remove = ScenarioInfo()
# define an empty dictionary
results = {}
# read csv file for basin names and region mapping
# reading basin_delineation
FILE = f"basins_by_region_simpl_{context.regions}.csv"
PATH = package_data_path("water", "delineation", FILE)
df = pd.read_csv(PATH)
# Assigning proper nomenclature
df["node"] = "B" + df["BCU_name"].astype(str)
df["mode"] = "M" + df["BCU_name"].astype(str)
df["region"] = (
context.map_ISO_c[context.regions]
if context.type_reg == "country"
else f"{context.regions}_" + df["REGION"].astype(str)
)
results["node"] = df["node"]
results["mode"] = df["mode"]
# map nodes as per dimensions
df1 = pd.DataFrame({"node_parent": df["region"], "node": df["node"]})
df2 = pd.DataFrame({"node_parent": df["node"], "node": df["node"]})
frame = [df1, df2]
df_node = pd.concat(frame)
nodes = df_node.values.tolist()
results["map_node"] = nodes
context.all_nodes = df["node"]
for set_name, config in results.items():
# Sets to add
add.set[set_name].extend(config)
return dict(require=require, remove=remove, add=add)
[docs]
def main(context: Context, scenario, **options):
"""Set up MESSAGEix-Nexus on `scenario`.
See also
--------
add_data
apply_spec
get_spec
"""
from .data import add_data
log.info("Set up MESSAGEix-Nexus")
if context.nexus_set == "nexus":
# Add water balance
spec = map_basin(context)
# Apply the structural changes AND add the data
build.apply_spec(scenario, spec, **options)
# Core water structure
spec1 = get_spec(context)
# Apply the structural changes AND add the data
build.apply_spec(scenario, spec1, partial(add_data, context=context), **options)
# Uncomment to dump for debugging
# scenario.to_excel('debug.xlsx')