"""Prepare data for water use for cooling & energy technologies."""
import numpy as np
import pandas as pd
from message_ix import Scenario, make_df
from message_ix_models import Context
from message_ix_models.model.water.data.demands import read_water_availability
from message_ix_models.model.water.utils import map_yv_ya_lt
from message_ix_models.util import (
broadcast,
minimum_version,
package_data_path,
same_node,
same_time,
)
[docs]@minimum_version("message_ix 3.7")
def map_basin_region_wat(context: "Context") -> pd.DataFrame:
"""
Calculate share of water availability of basins per each parent region.
The parent region could be global message regions or country
Parameters
----------
context : .Context
Returns
-------
data : pandas.DataFrame
"""
info = context["water build info"]
if "year" in context.time:
PATH = package_data_path(
"water", "delineation", f"basins_by_region_simpl_{context.regions}.csv"
)
df_x = pd.read_csv(PATH)
# Adding freshwater supply constraints
# Reading data, the data is spatially and temprally aggregated from GHMs
path1 = package_data_path(
"water",
"availability",
f"qtot_5y_{context.RCP}_{context.REL}_{context.regions}.csv",
)
df_sw = pd.read_csv(path1)
df_sw.drop(["Unnamed: 0"], axis=1, inplace=True)
# Reading data, the data is spatially and temporally aggregated from GHMs
df_sw["BCU_name"] = df_x["BCU_name"]
df_sw["MSGREG"] = (
context.map_ISO_c[context.regions]
if context.type_reg == "country"
else f"{context.regions}_" + df_sw["BCU_name"].str.split("|").str[-1]
)
df_sw = df_sw.set_index(["MSGREG", "BCU_name"])
# Calculating ratio of water availability in basin by region
df_sw = df_sw.groupby(["MSGREG"]).apply(lambda x: x / x.sum())
df_sw.reset_index(level=0, drop=True, inplace=True)
df_sw.reset_index(inplace=True)
df_sw["Region"] = "B" + df_sw["BCU_name"].astype(str)
df_sw["Mode"] = df_sw["Region"].replace(regex=["^B"], value="M")
df_sw.drop(columns=["BCU_name"], inplace=True)
df_sw.set_index(["MSGREG", "Region", "Mode"], inplace=True)
df_sw = df_sw.stack().reset_index(level=0).reset_index()
df_sw.columns = pd.Index(["region", "mode", "date", "MSGREG", "share"])
df_sw.sort_values(["region", "date", "MSGREG", "share"], inplace=True)
df_sw["year"] = pd.DatetimeIndex(df_sw["date"]).year
df_sw["time"] = "year"
df_sw = df_sw[df_sw["year"].isin(info.Y)]
df_sw.reset_index(drop=True, inplace=True)
else:
# add water return flows for cooling tecs
# Use share of basin availability to distribute the return flow from
path3 = package_data_path(
"water",
"availability",
f"qtot_5y_m_{context.RCP}_{context.REL}_{context.regions}.csv",
)
df_sw = pd.read_csv(path3)
# reading sample for assiging basins
PATH = package_data_path(
"water", "delineation", f"basins_by_region_simpl_{context.regions}.csv"
)
df_x = pd.read_csv(PATH)
# Reading data, the data is spatially and temporally aggregated from GHMs
df_sw["BCU_name"] = df_x["BCU_name"]
df_sw["MSGREG"] = (
context.map_ISO_c[context.regions]
if context.type_reg == "country"
else f"{context.regions}_" + df_sw["BCU_name"].str.split("|").str[-1]
)
df_sw = df_sw.set_index(["MSGREG", "BCU_name"])
df_sw.drop(columns="Unnamed: 0", inplace=True)
# Calculating ratio of water availability in basin by region
df_sw = df_sw.groupby(["MSGREG"]).apply(lambda x: x / x.sum())
df_sw.reset_index(level=0, drop=True, inplace=True)
df_sw.reset_index(inplace=True)
df_sw["Region"] = "B" + df_sw["BCU_name"].astype(str)
df_sw["Mode"] = df_sw["Region"].replace(regex=["^B"], value="M")
df_sw.drop(columns=["BCU_name"], inplace=True)
df_sw.set_index(["MSGREG", "Region", "Mode"], inplace=True)
df_sw = df_sw.stack().reset_index(level=0).reset_index()
df_sw.columns = pd.Index(["node", "mode", "date", "MSGREG", "share"])
df_sw.sort_values(["node", "date", "MSGREG", "share"], inplace=True)
df_sw["year"] = pd.DatetimeIndex(df_sw["date"]).year
df_sw["time"] = pd.DatetimeIndex(df_sw["date"]).month
df_sw = df_sw[df_sw["year"].isin(info.Y)]
df_sw.reset_index(drop=True, inplace=True)
return df_sw
[docs]def add_water_supply(context: "Context") -> dict[str, pd.DataFrame]:
"""Add Water supply infrastructure
This function links the water supply based on different settings and options.
It defines the supply linkages for freshwater, groundwater and salinewater.
Parameters
----------
context : .Context
Returns
-------
data : dict of (str -> pandas.DataFrame)
Keys are MESSAGE parameter names such as 'input', 'fix_cost'.
Values are data frames ready for :meth:`~.Scenario.add_par`.
Years in the data include the model horizon indicated by
``context["water build info"]``, plus the additional year 2010.
"""
# define an empty dictionary
results = {}
# Reference to the water configuration
info = context["water build info"]
# load the scenario from context
# scen = context.get_scenario()
scen = Scenario(context.get_platform(), **context.core.scenario_info)
# year_wat = (2010, 2015)
fut_year = info.Y
year_wat = (2010, 2015, *info.Y)
sub_time = context.time
print(sub_time)
# first activity year for all water technologies is 2020
first_year = scen.firstmodelyear
print(" future year = ", fut_year)
print(" year_wat = ", year_wat)
# reading basin_delineation
FILE = f"basins_by_region_simpl_{context.regions}.csv"
PATH = package_data_path("water", "delineation", FILE)
df_node = pd.read_csv(PATH)
# Assigning proper nomenclature
df_node["node"] = "B" + df_node["BCU_name"].astype(str)
df_node["mode"] = "M" + df_node["BCU_name"].astype(str)
df_node["region"] = (
context.map_ISO_c[context.regions]
if context.type_reg == "country"
else f"{context.regions}_" + df_node["REGION"].astype(str)
)
# Storing the energy MESSAGE region names
node_region = df_node["region"].unique()
# reading groundwater energy intensity data
FILE1 = f"gw_energy_intensity_depth_{context.regions}.csv"
PATH1 = package_data_path("water", "availability", FILE1)
df_gwt = pd.read_csv(PATH1)
df_gwt["region"] = (
context.map_ISO_c[context.regions]
if context.type_reg == "country"
else f"{context.regions}_" + df_gwt["REGION"].astype(str)
)
# reading groundwater energy intensity data
FILE2 = f"historical_new_cap_gw_sw_km3_year_{context.regions}.csv"
PATH2 = package_data_path("water", "availability", FILE2)
df_hist = pd.read_csv(PATH2)
df_hist["BCU_name"] = "B" + df_hist["BCU_name"].astype(str)
if context.nexus_set == "cooling":
# Add output df for surfacewater supply for regions
output_df = (
make_df(
"output",
technology="extract_surfacewater",
value=1,
unit="km3",
year_vtg=year_wat,
year_act=year_wat,
level="water_supply",
commodity="freshwater",
mode="M1",
time="year",
time_dest="year",
time_origin="year",
)
.pipe(broadcast, node_loc=node_region)
.pipe(same_node)
)
# Add output df for groundwater supply for regions
output_df = pd.concat(
[
output_df,
make_df(
"output",
technology="extract_groundwater",
value=1,
unit="km3",
year_vtg=year_wat,
year_act=year_wat,
level="water_supply",
commodity="freshwater",
mode="M1",
time="year",
time_dest="year",
time_origin="year",
)
.pipe(broadcast, node_loc=node_region)
.pipe(same_node),
]
)
# Add output of saline water supply for regions
output_df = pd.concat(
[
output_df,
make_df(
"output",
technology="extract_salinewater",
value=1,
unit="km3",
year_vtg=year_wat,
year_act=year_wat,
level="saline_supply",
commodity="saline_ppl",
mode="M1",
time="year",
time_dest="year",
time_origin="year",
)
.pipe(broadcast, node_loc=node_region)
.pipe(same_node),
]
)
results["output"] = output_df
elif context.nexus_set == "nexus":
# input data frame for slack technology balancing equality with demands
inp = (
make_df(
"input",
technology="return_flow",
value=1,
unit="-",
level="water_avail_basin",
commodity="surfacewater_basin",
mode="M1",
year_vtg=year_wat,
year_act=year_wat,
)
.pipe(
broadcast,
node_loc=df_node["node"],
time=pd.Series(sub_time),
)
.pipe(same_node)
.pipe(same_time)
)
inp = pd.concat(
[
inp,
make_df(
"input",
technology="gw_recharge",
value=1,
unit="-",
level="water_avail_basin",
commodity="groundwater_basin",
mode="M1",
year_vtg=year_wat,
year_act=year_wat,
)
.pipe(
broadcast,
node_loc=df_node["node"],
time=pd.Series(sub_time),
)
.pipe(same_node)
.pipe(same_time),
]
)
# input dataframe linking water supply to energy dummy technology
inp = pd.concat(
[
inp,
make_df(
"input",
technology="basin_to_reg",
value=1,
unit="-",
level="water_supply_basin",
commodity="freshwater_basin",
mode=df_node["mode"],
node_origin=df_node["node"],
node_loc=df_node["region"],
)
.pipe(
broadcast,
year_vtg=year_wat,
time=pd.Series(sub_time),
)
.pipe(same_time),
]
)
inp["year_act"] = inp["year_vtg"]
# # input data frame for slack technology balancing equality with demands
# inp = pd.concat([inp,
# make_df(
# "input",
# technology="salinewater_return",
# value=1,
# unit="-",
# level="water_avail_basin",
# commodity="salinewater_basin",
# mode="M1",
# time="year",
# time_origin="year",
# node_origin=df_node["node"],
# node_loc=df_node["node"],
# ).pipe(broadcast, year_vtg=year_wat, year_act=year_wat)
# ])
# input data frame for freshwater supply
yv_ya_sw = map_yv_ya_lt(year_wat, 50, first_year)
inp = pd.concat(
[
inp,
make_df(
"input",
technology="extract_surfacewater",
value=1,
unit="-",
level="water_avail_basin",
commodity="surfacewater_basin",
mode="M1",
node_origin=df_node["node"],
node_loc=df_node["node"],
)
.pipe(
broadcast,
yv_ya_sw,
time=pd.Series(sub_time),
)
.pipe(same_time),
]
)
# input dataframe for groundwater supply
yv_ya_gw = map_yv_ya_lt(year_wat, 20, first_year)
inp = pd.concat(
[
inp,
make_df(
"input",
technology="extract_groundwater",
value=1,
unit="-",
level="water_avail_basin",
commodity="groundwater_basin",
mode="M1",
node_origin=df_node["node"],
node_loc=df_node["node"],
)
.pipe(
broadcast,
yv_ya_gw,
time=pd.Series(sub_time),
)
.pipe(same_time),
]
)
# electricity input dataframe for extract freshwater supply
# low: 0.001141553, mid: 0.018835616, high: 0.03652968
inp = pd.concat(
[
inp,
make_df(
"input",
technology="extract_surfacewater",
value=0.018835616,
unit="-",
level="final",
commodity="electr",
mode="M1",
time_origin="year",
node_origin=df_node["region"],
node_loc=df_node["node"],
).pipe(
broadcast,
yv_ya_sw,
time=pd.Series(sub_time),
),
]
)
inp = pd.concat(
[
inp,
make_df(
"input",
technology="extract_groundwater",
value=df_gwt["GW_per_km3_per_year"] + 0.043464579,
unit="-",
level="final",
commodity="electr",
mode="M1",
time_origin="year",
node_origin=df_node["region"],
node_loc=df_node["node"],
).pipe(
broadcast,
yv_ya_gw,
time=pd.Series(sub_time),
),
]
)
inp = pd.concat(
[
inp,
make_df(
"input",
technology="extract_gw_fossil",
value=(df_gwt["GW_per_km3_per_year"] + 0.043464579)
* 2, # twice as much normal gw
unit="-",
level="final",
commodity="electr",
mode="M1",
time_origin="year",
node_origin=df_node["region"],
node_loc=df_node["node"],
).pipe(
broadcast,
yv_ya_gw,
time=pd.Series(sub_time),
),
]
)
if context.type_reg == "global":
inp.loc[
(inp["technology"].str.contains("extract_gw_fossil"))
& (inp["year_act"] == 2020)
& (inp["node_loc"] == "R11_SAS"),
"value",
] *= 0.5
results["input"] = inp
# Add output df for freshwater supply for basins
output_df = (
make_df(
"output",
technology="extract_surfacewater",
value=1,
unit="-",
level="water_supply_basin",
commodity="freshwater_basin",
mode="M1",
node_loc=df_node["node"],
node_dest=df_node["node"],
)
.pipe(
broadcast,
yv_ya_sw,
time=pd.Series(sub_time),
)
.pipe(same_time)
)
# Add output df for groundwater supply for basins
output_df = pd.concat(
[
output_df,
make_df(
"output",
technology="extract_groundwater",
value=1,
unit="-",
level="water_supply_basin",
commodity="freshwater_basin",
mode="M1",
node_loc=df_node["node"],
node_dest=df_node["node"],
)
.pipe(
broadcast,
yv_ya_gw,
time=pd.Series(sub_time),
)
.pipe(same_time),
]
)
# Add output df for groundwater supply for basins
output_df = pd.concat(
[
output_df,
make_df(
"output",
technology="extract_gw_fossil",
value=1,
unit="-",
level="water_supply_basin",
commodity="freshwater_basin",
mode="M1",
node_loc=df_node["node"],
node_dest=df_node["node"],
time_origin="year",
)
.pipe(
broadcast,
yv_ya_gw,
time=pd.Series(sub_time),
)
.pipe(same_time),
]
)
# Add output of saline water supply for regions
output_df = pd.concat(
[
output_df,
make_df(
"output",
technology="extract_salinewater",
value=1,
unit="km3",
year_vtg=year_wat,
year_act=year_wat,
level="saline_supply",
commodity="saline_ppl",
mode="M1",
time="year",
time_dest="year",
time_origin="year",
)
.pipe(broadcast, node_loc=node_region)
.pipe(same_node),
]
)
hist_new_cap = make_df(
"historical_new_capacity",
node_loc=df_hist["BCU_name"],
technology="extract_surfacewater",
value=df_hist["hist_cap_sw_km3_year"] / 5, # n period
unit="km3/year",
year_vtg=2015,
)
hist_new_cap = pd.concat(
[
hist_new_cap,
make_df(
"historical_new_capacity",
node_loc=df_hist["BCU_name"],
technology="extract_groundwater",
value=df_hist["hist_cap_gw_km3_year"] / 5,
unit="km3/year",
year_vtg=2015,
),
]
)
results["historical_new_capacity"] = hist_new_cap
# output data frame linking water supply to energy dummy technology
output_df = pd.concat(
[
output_df,
make_df(
"output",
technology="basin_to_reg",
value=1,
unit="-",
level="water_supply",
commodity="freshwater",
time_dest="year",
node_loc=df_node["region"],
node_dest=df_node["region"],
mode=df_node["mode"],
).pipe(broadcast, year_vtg=year_wat, time=pd.Series(sub_time)),
]
)
output_df["year_act"] = output_df["year_vtg"]
results["output"] = output_df
# dummy variable cost for dummy water to energy technology
var = make_df(
"var_cost",
technology="basin_to_reg",
mode=df_node["mode"],
node_loc=df_node["region"],
value=20,
unit="-",
).pipe(broadcast, year_vtg=year_wat, time=pd.Series(sub_time))
var["year_act"] = var["year_vtg"]
# # Dummy cost for extract surface ewater to prioritize water sources
# var = pd.concat([var, make_df(
# "var_cost",
# technology='extract_surfacewater',
# value= 0.0001,
# unit="USD/km3",
# mode="M1",
# time="year",
# ).pipe(broadcast, year_vtg=year_wat,
# year_act=year_wat, node_loc=df_node["node"]
# )
# ])
# # Dummy cost for extract groundwater
# var = pd.concat([var, make_df(
# "var_cost",
# technology='extract_groundwater',
# value= 0.001,
# unit="USD/km3",
# mode="M1",
# time="year",
# ).pipe(broadcast, year_vtg=year_wat,
# year_act=year_wat, node_loc=df_node["node"]
# ])
# )
results["var_cost"] = var
# load the share of sw
df_sw = map_basin_region_wat(context)
share = make_df(
"share_mode_up",
shares="share_basin",
technology="basin_to_reg",
mode=df_sw["mode"],
node_share=df_sw["MSGREG"],
time=df_sw["time"],
value=df_sw["share"],
unit="%",
year_act=df_sw["year"],
)
results["share_mode_up"] = share
tl = (
make_df(
"technical_lifetime",
technology="extract_surfacewater",
value=50,
unit="y",
)
.pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"])
.pipe(same_node)
)
tl = pd.concat(
[
tl,
make_df(
"technical_lifetime",
technology="extract_groundwater",
value=20,
unit="y",
)
.pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"])
.pipe(same_node),
]
)
tl = pd.concat(
[
tl,
make_df(
"technical_lifetime",
technology="extract_gw_fossil",
value=20,
unit="y",
)
.pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"])
.pipe(same_node),
]
)
results["technical_lifetime"] = tl
# Prepare dataframe for investments
inv_cost = make_df(
"inv_cost",
technology="extract_surfacewater",
value=155.57,
unit="USD/km3",
).pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"])
inv_cost = pd.concat(
[
inv_cost,
make_df(
"inv_cost",
technology="extract_groundwater",
value=54.52,
unit="USD/km3",
).pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]),
]
)
inv_cost = pd.concat(
[
inv_cost,
make_df(
"inv_cost",
technology="extract_gw_fossil",
value=54.52 * 150, # assume higher as normal GW
unit="USD/km3",
).pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]),
]
)
results["inv_cost"] = inv_cost
fix_cost = make_df(
"fix_cost",
technology="extract_gw_fossil",
value=300, # assumed
unit="USD/km3",
).pipe(broadcast, yv_ya_gw, node_loc=df_node["node"])
results["fix_cost"] = fix_cost
return results
[docs]def add_e_flow(context: "Context") -> dict[str, pd.DataFrame]:
"""Add environmental flows
This function bounds the available water and allocates the environmental
flows.Environmental flow bounds are calculated using Variable Monthly Flow
(VMF) method. The VMF method is applied to wet and dry seasonal runoff
values. These wet and dry seasonal values are then aggregated to annual
values.Environmental flows in the model will be incorporated as bounds on
'return_flow' technology. The lower bound on this technology will ensure
that certain amount of water remain
Parameters
----------
context : .Context
Returns
-------
data : dict of (str -> pandas.DataFrame)
Keys are MESSAGE parameter names such as 'input', 'fix_cost'.
Values are data frames ready for :meth:`~.Scenario.add_par`.
Years in the data include the model horizon indicated by
``context["water build info"]``, plus the additional year 2010.
"""
# define an empty dictionary
results = {}
info = context["water build info"]
# Adding freshwater supply constraints
# Reading data, the data is spatially and temprally aggregated from GHMs
df_sw, df_gw = read_water_availability(context)
# reading sample for assiging basins
PATH = package_data_path(
"water", "delineation", f"basins_by_region_simpl_{context.regions}.csv"
)
df_x = pd.read_csv(PATH)
dmd_df = make_df(
"demand",
node="B" + df_sw["Region"].astype(str),
commodity="surfacewater_basin",
level="water_avail_basin",
year=df_sw["year"],
time=df_sw["time"],
value=df_sw["value"],
unit="km3/year",
)
dmd_df = dmd_df[dmd_df["year"] >= 2025].reset_index(drop=True)
dmd_df["value"] = dmd_df["value"].apply(lambda x: x if x >= 0 else 0)
if "year" in context.time:
# Reading data, the data is spatially and temporally aggregated from GHMs
path1 = package_data_path(
"water",
"availability",
f"e-flow_{context.RCP}_{context.regions}.csv",
)
df_env = pd.read_csv(path1)
df_env.drop(["Unnamed: 0"], axis=1, inplace=True)
df_env.index = df_x["BCU_name"].index
df_env = df_env.stack().reset_index()
df_env.columns = pd.Index(["Region", "years", "value"])
df_env.sort_values(["Region", "years", "value"], inplace=True)
df_env.fillna(0, inplace=True)
df_env.reset_index(drop=True, inplace=True)
df_env["year"] = pd.DatetimeIndex(df_env["years"]).year
df_env["time"] = "year"
df_env2210 = df_env[df_env["year"] == 2100].copy()
df_env2210.loc["year"] = 2110
df_env = pd.concat([df_env, df_env2210])
df_env = df_env[df_env["year"].isin(info.Y)]
else:
# Reading data, the data is spatially and temporally aggregated from GHMs
path1 = package_data_path(
"water",
"availability",
f"e-flow_5y_m_{context.RCP}_{context.regions}.csv",
)
df_env = pd.read_csv(path1)
df_env.drop(["Unnamed: 0"], axis=1, inplace=True)
# new_cols = pd.to_datetime(df_env.columns, format="%Y/%m/%d")
# df_env.columns = new_cols
df_env.index = df_x["BCU_name"].index
df_env = df_env.stack().reset_index()
df_env.columns = pd.Index(["Region", "years", "value"])
df_env.sort_values(["Region", "years", "value"], inplace=True)
df_env.fillna(0, inplace=True)
df_env.reset_index(drop=True, inplace=True)
df_env["year"] = pd.DatetimeIndex(df_env["years"]).year
df_env["time"] = pd.DatetimeIndex(df_env["years"]).month
df_env2210 = df_env[df_env["year"] == 2100].copy()
df_env2210.loc["year"] = 2110
df_env = pd.concat([df_env, df_env2210])
df_env = df_env[df_env["year"].isin(info.Y)]
# Return a processed dataframe for env flow calculations
if context.SDG != "baseline":
# dataframe to put constraints on env flows
eflow_df = make_df(
"bound_activity_lo",
node_loc="B" + df_env["Region"].astype(str),
technology="return_flow",
year_act=df_env["year"],
mode="M1",
time=df_env["time"],
value=df_env["value"],
unit="km3/year",
)
eflow_df["value"] = eflow_df["value"].apply(lambda x: x if x >= 0 else 0)
eflow_df = eflow_df[eflow_df["year_act"] >= 2025].reset_index(drop=True)
dmd_df.sort_values(by=["node", "year"], inplace=True)
dmd_df.reset_index(drop=True, inplace=True)
eflow_df.sort_values(by=["node_loc", "year_act"], inplace=True)
eflow_df.reset_index(drop=True, inplace=True)
eflow_df["value"] = np.where(
eflow_df["value"] >= 0.7 * dmd_df["value"],
0.7 * dmd_df["value"],
eflow_df["value"],
)
results["bound_activity_lo"] = eflow_df
return results