Source code for message_ix_models.model.water.data.infrastructure

"""Prepare data for adding techs related to water distribution,
treatment in urban & rural"""

from collections import defaultdict
from typing import Any

import pandas as pd
from message_ix import make_df

from message_ix_models import Context
from message_ix_models.model.water.utils import map_yv_ya_lt
from message_ix_models.util import (
    broadcast,
    make_matched_dfs,
    package_data_path,
    same_node,
    same_time,
)


[docs]def start_creating_input_dataframe( sdg: str, df_node: pd.DataFrame, df_non_elec: pd.DataFrame, df_dist: pd.DataFrame, year_wat: tuple, first_year: int, sub_time, ) -> pd.DataFrame: """Creates an input pd.DataFrame and adds some data to it.""" inp_df = pd.DataFrame([]) # Input Dataframe for non elec commodities for index, rows in df_non_elec.iterrows(): inp_df = pd.concat( [ inp_df, ( make_df( "input", technology=rows["tec"], value=rows["value_mid"], unit="-", level=rows["inlvl"], commodity=rows["incmd"], mode="M1", node_loc=df_node["node"], ) .pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), time=sub_time, ) .pipe(same_node) .pipe(same_time) ), ] ) if sdg != "baseline": for index, rows in df_dist.iterrows(): return pd.concat( [ inp_df, ( make_df( "input", technology=rows["tec"], value=rows["value_high"], unit="-", level=rows["inlvl"], commodity=rows["incmd"], mode="Mf", ) .pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time) ), ] ) else: for index, rows in df_dist.iterrows(): inp_df = pd.concat( [ inp_df, ( make_df( "input", technology=rows["tec"], value=rows["value_mid"], unit="-", level=rows["inlvl"], commodity=rows["incmd"], mode="M1", ) .pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time) ), ] ) return pd.concat( [ inp_df, ( make_df( "input", technology=rows["tec"], value=rows["value_high"], unit="-", level=rows["inlvl"], commodity=rows["incmd"], mode="Mf", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["technical_lifetime_mid"], first_year), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time) ), ] )
[docs]def add_infrastructure_techs(context: "Context") -> dict[str, pd.DataFrame]: """Process water distribution data for a scenario instance. Parameters ---------- context : .Context Returns ------- data : dict of (str -> pandas.DataFrame) Keys are MESSAGE parameter names such as 'input', 'fix_cost'. Values are data frames ready for :meth:`~.Scenario.add_par`. Years in the data include the model horizon indicated by ``context["water build info"]``, plus the additional year 2010. """ # Reference to the water configuration info = context["water build info"] # define an empty dictionary results = {} sub_time = context.time # load the scenario from context scen = context.get_scenario() year_wat = (2010, 2015, *info.Y) # first activity year for all water technologies is 2020 first_year = scen.firstmodelyear # reading basin_delineation FILE2 = f"basins_by_region_simpl_{context.regions}.csv" PATH = package_data_path("water", "delineation", FILE2) df_node = pd.read_csv(PATH) # Assigning proper nomenclature df_node["node"] = "B" + df_node["BCU_name"].astype(str) df_node["mode"] = "M" + df_node["BCU_name"].astype(str) df_node["region"] = ( context.map_ISO_c[context.regions] if context.type_reg == "country" else f"{context.regions}_" + df_node["REGION"].astype(str) ) # Reading water distribution mapping from csv path = package_data_path("water", "infrastructure", "water_distribution.xlsx") df = pd.read_excel(path) techs = [ "urban_t_d", "urban_unconnected", "industry_unconnected", "rural_t_d", "rural_unconnected", ] df_non_elec = df[df["incmd"] != "electr"].reset_index() df_dist = df_non_elec[df_non_elec["tec"].isin(techs)] df_non_elec = df_non_elec[~df_non_elec["tec"].isin(techs)] df_elec = df[df["incmd"] == "electr"].reset_index() inp_df = start_creating_input_dataframe( sdg=context.SDG, df_node=df_node, df_non_elec=df_non_elec, df_dist=df_dist, year_wat=year_wat, first_year=first_year, sub_time=sub_time, ) result_dc = prepare_input_dataframe( context=context, sub_time=sub_time, year_wat=year_wat, first_year=first_year, df_node=df_node, techs=techs, df_elec=df_elec, ) results_new = {par_name: pd.concat(dfs) for par_name, dfs in result_dc.items()} inp_df = pd.concat([inp_df, results_new["input"]]) # inp_df.dropna(inplace = True) results["input"] = inp_df # add output dataframe df_out = df[~df["outcmd"].isna()] df_out_dist = df_out[df_out["tec"].isin(techs)] df_out = df_out[~df_out["tec"].isin(techs)] out_df = pd.DataFrame([]) for index, rows in df_out.iterrows(): out_df = pd.concat( [ out_df, ( make_df( "output", technology=rows["tec"], value=rows["out_value_mid"], unit="-", level=rows["outlvl"], commodity=rows["outcmd"], mode="M1", ) .pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time) ), ] ) if context.SDG != "baseline": out_df = pd.concat( [ out_df, make_df( "output", technology=df_out_dist["tec"], value=df_out_dist["out_value_mid"], unit="-", level=df_out_dist["outlvl"], commodity=df_out_dist["outcmd"], mode="Mf", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["technical_lifetime_mid"], first_year), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time), ] ) else: out_df = pd.concat( [ out_df, make_df( "output", technology=df_out_dist["tec"], value=df_out_dist["out_value_mid"], unit="-", level=df_out_dist["outlvl"], commodity=df_out_dist["outcmd"], mode="M1", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["technical_lifetime_mid"], first_year), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time), ] ) out_df = pd.concat( [ out_df, make_df( "output", technology=df_out_dist["tec"], value=df_out_dist["out_value_mid"], unit="-", level=df_out_dist["outlvl"], commodity=df_out_dist["outcmd"], mode="Mf", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["technical_lifetime_mid"], first_year), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node) .pipe(same_time), ] ) results["output"] = out_df # Filtering df for capacity factors df_cap = df.dropna(subset=["capacity_factor_mid"]) cap_df = pd.DataFrame([]) # Adding capacity factor dataframe for index, rows in df_cap.iterrows(): cap_df = pd.concat( [ cap_df, make_df( "capacity_factor", technology=rows["tec"], value=rows["capacity_factor_mid"], unit="%", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["technical_lifetime_mid"], first_year), node_loc=df_node["node"], time=sub_time, ) .pipe(same_node), ] ) results["capacity_factor"] = cap_df # Filtering df for capacity factors df_tl = df.dropna(subset=["technical_lifetime_mid"]) tl = ( make_df( "technical_lifetime", technology=df_tl["tec"], value=df_tl["technical_lifetime_mid"], unit="y", ) .pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]) .pipe(same_node) ) results["technical_lifetime"] = tl cons_time = make_matched_dfs(tl, construction_time=1) results["construction_time"] = cons_time["construction_time"] # Investment costs df_inv = df.dropna(subset=["investment_mid"]) # Prepare dataframe for investments inv_cost = make_df( "inv_cost", technology=df_inv["tec"], value=df_inv["investment_mid"], unit="USD/km3", ).pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]) inv_cost = inv_cost[~inv_cost["technology"].isin(techs)] results["inv_cost"] = inv_cost # Fixed costs # Prepare data frame for fix_cost fix_cost = pd.DataFrame([]) var_cost = pd.DataFrame([]) for index, rows in df_inv.iterrows(): fix_cost = pd.concat( [ fix_cost, make_df( "fix_cost", technology=df_inv["tec"], value=df_inv["fix_cost_mid"], unit="USD/km3", ).pipe( broadcast, map_yv_ya_lt(year_wat, rows["technical_lifetime_mid"], first_year), node_loc=df_node["node"], ), ] ) fix_cost = fix_cost[~fix_cost["technology"].isin(techs)] results["fix_cost"] = fix_cost df_var = df_inv[~df_inv["tec"].isin(techs)] df_var_dist = df_inv[df_inv["tec"].isin(techs)] df_var = df_inv[~df_inv["tec"].isin(techs)] df_var_dist = df_inv[df_inv["tec"].isin(techs)] if context.SDG != "baseline": for index, rows in df_var.iterrows(): # Variable cost var_cost = pd.concat( [ var_cost, make_df( "var_cost", technology=rows["tec"], value=rows["var_cost_mid"], unit="USD/km3", mode="M1", ).pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ), ] ) # Variable cost for distribution technologies for index, rows in df_var_dist.iterrows(): var_cost = pd.concat( [ var_cost, make_df( "var_cost", technology=rows["tec"], value=rows["var_cost_high"], unit="USD/km3", mode="Mf", ).pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ), ] ) results["var_cost"] = var_cost else: # Variable cost for index, rows in df_var.iterrows(): var_cost = pd.concat( [ var_cost, make_df( "var_cost", technology=rows["tec"], value=df_var["var_cost_mid"], unit="USD/km3", mode="M1", ).pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ), ] ) for index, rows in df_var_dist.iterrows(): var_cost = pd.concat( [ var_cost, make_df( "var_cost", technology=rows["tec"], value=rows["var_cost_mid"], unit="USD/km3", mode="M1", ).pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ), ] ) var_cost = pd.concat( [ var_cost, make_df( "var_cost", technology=rows["tec"], value=rows["var_cost_high"], unit="USD/km3", mode="Mf", ).pipe( broadcast, map_yv_ya_lt( year_wat, rows["technical_lifetime_mid"], first_year ), node_loc=df_node["node"], time=sub_time, ), ] ) results["var_cost"] = var_cost return results
def prepare_input_dataframe( context: "Context", sub_time, year_wat: tuple, first_year: int, df_node: pd.DataFrame, techs: list[str], df_elec: pd.DataFrame, ) -> defaultdict[Any, list]: result_dc = defaultdict(list) for _, rows in df_elec.iterrows(): if rows["tec"] in techs: if context.SDG != "baseline": inp = make_df( "input", technology=rows["tec"], value=rows["value_high"], unit="-", level="final", commodity="electr", mode="Mf", time_origin="year", node_loc=df_node["node"], node_origin=df_node["region"], ).pipe( broadcast, map_yv_ya_lt( year_wat, # 1 because elec commodities don't have technical lifetime 1, first_year, ), time=sub_time, ) result_dc["input"].append(inp) else: inp = make_df( "input", technology=rows["tec"], value=rows["value_high"], unit="-", level="final", commodity="electr", mode="Mf", time_origin="year", node_loc=df_node["node"], node_origin=df_node["region"], ).pipe( broadcast, map_yv_ya_lt( year_wat, # 1 because elec commodities don't have technical lifetime 1, first_year, ), time=sub_time, ) inp = pd.concat( [ inp, make_df( "input", technology=rows["tec"], value=rows["value_mid"], unit="-", level="final", commodity="electr", mode="M1", time_origin="year", node_loc=df_node["node"], node_origin=df_node["region"], ).pipe( broadcast, # 1 because elec commodities don't have technical lifetime map_yv_ya_lt(year_wat, 1, first_year), time=sub_time, ), ] ) result_dc["input"].append(inp) else: inp = make_df( "input", technology=rows["tec"], value=rows["value_mid"], unit="-", level="final", commodity="electr", mode="M1", time_origin="year", node_loc=df_node["node"], node_origin=df_node["region"], ).pipe( broadcast, map_yv_ya_lt(year_wat, 1, first_year), time=sub_time, ) result_dc["input"].append(inp) return result_dc
[docs]def add_desalination(context: "Context") -> dict[str, pd.DataFrame]: """Add desalination infrastructure Two types of desalination are considered; 1. Membrane 2. Distillation Parameters ---------- context : .Context Returns ------- data : dict of (str -> pandas.DataFrame) Keys are MESSAGE parameter names such as 'input', 'fix_cost'. Values are data frames ready for :meth:`~.Scenario.add_par`. Years in the data include the model horizon indicated by ``context["water build info"]``, plus the additional year 2010. """ # define an empty dictionary results = {} sub_time = context.time # Reference to the water configuration info = context["water build info"] # load the scenario from context scen = context.get_scenario() year_wat = (2010, 2015, *info.Y) # first activity year for all water technologies is 2020 first_year = scen.firstmodelyear # Reading water distribution mapping from csv path = package_data_path("water", "infrastructure", "desalination.xlsx") path2 = package_data_path( "water", "infrastructure", f"historical_capacity_desalination_km3_year_{context.regions}.csv", ) path3 = package_data_path( "water", "infrastructure", f"projected_desalination_potential_km3_year_{context.regions}.csv", ) # Reading dataframes df_desal = pd.read_excel(path) df_hist = pd.read_csv(path2) df_proj = pd.read_csv(path3) df_proj = df_proj[df_proj["rcp"] == f"{context.RCP}"] df_proj = df_proj[~(df_proj["year"] == 2065) & ~(df_proj["year"] == 2075)] df_proj.reset_index(inplace=True, drop=True) df_proj = df_proj[df_proj["year"].isin(info.Y)] # reading basin_delineation FILE2 = f"basins_by_region_simpl_{context.regions}.csv" PATH = package_data_path("water", "delineation", FILE2) df_node = pd.read_csv(PATH) # Assigning proper nomenclature df_node["node"] = "B" + df_node["BCU_name"].astype(str) df_node["mode"] = "M" + df_node["BCU_name"].astype(str) df_node["region"] = ( context.map_ISO_c[context.regions] if context.type_reg == "country" else f"{context.regions}_" + df_node["REGION"].astype(str) ) # output dataframe linking to desal tech types out_df = ( make_df( "output", technology="extract_salinewater_basin", value=1, unit="km3/year", level="water_avail_basin", commodity="salinewater_basin", mode="M1", ) .pipe( broadcast, map_yv_ya_lt(year_wat, 20, first_year), node_loc=df_node["node"], time=pd.Series(sub_time), ) .pipe(same_node) .pipe(same_time) ) tl = ( make_df( "technical_lifetime", technology="extract_salinewater_basin", value=20, unit="y", ) .pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]) .pipe(same_node) ) # Historical capacity of desalination technologies df_hist_cap = make_df( "historical_new_capacity", node_loc="B" + df_hist["BCU_name"], technology=df_hist["tec_type"], year_vtg=df_hist["year"], value=df_hist["cap_km3_year"], unit="km3/year", ) # Divide the historical capacity by 5 since the existing data is summed over # 5 years and model needs per year df_hist_cap["value"] = df_hist_cap["value"] / 5 results["historical_new_capacity"] = df_hist_cap # Desalination potentials are added as an upper bound # to limit the salinewater extraction bound_up = make_df( "bound_total_capacity_up", node_loc="B" + df_proj["BCU_name"], technology="extract_salinewater_basin", year_act=df_proj["year"], value=df_proj["cap_km3_year"], unit="km3/year", ) # Making negative values zero bound_up["value"].clip(lower=0, inplace=True) # Bound should start from 2025 bound_up = bound_up[bound_up["year_act"] > 2020] results["bound_total_capacity_up"] = bound_up # Investment costs inv_cost = make_df( "inv_cost", technology=df_desal["tec"], value=df_desal["inv_cost_mid"], unit="USD/km3", ).pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]) results["inv_cost"] = inv_cost fix_cost = pd.DataFrame([]) var_cost = pd.DataFrame([]) for index, rows in df_desal.iterrows(): # Fixed costs # Prepare dataframe for fix_cost fix_cost = pd.concat( [ fix_cost, make_df( "fix_cost", technology=rows["tec"], value=rows["fix_cost_mid"], unit="USD/km3", ).pipe( broadcast, map_yv_ya_lt(year_wat, rows["lifetime_mid"], first_year), node_loc=df_node["node"], ), ] ) results["fix_cost"] = fix_cost # Variable cost var_cost = pd.concat( [ var_cost, make_df( "var_cost", technology=rows["tec"], value=rows["var_cost_mid"], unit="USD/km3", mode="M1", ).pipe( broadcast, map_yv_ya_lt(year_wat, rows["lifetime_mid"], first_year), node_loc=df_node["node"], time=pd.Series(sub_time), ), ] ) # Dummy Variable cost for salinewater extrqction # var_cost = var_cost.append( # make_df( # "var_cost", # technology='extract_salinewater_basin', # value= 100, # unit="USD/km3", # mode="M1", # time="year", # ).pipe(broadcast, year_vtg=year_wat, year_act=year_wat, node_loc=df_node["node"]) # ) results["var_cost"] = var_cost tl = pd.concat( [ tl, ( make_df( "technical_lifetime", technology=df_desal["tec"], value=df_desal["lifetime_mid"], unit="y", ) .pipe(broadcast, year_vtg=year_wat, node_loc=df_node["node"]) .pipe(same_node) ), ] ) results["technical_lifetime"] = tl cons_time = make_matched_dfs(tl, construction_time=3) results["construction_time"] = cons_time["construction_time"] from collections import defaultdict result_dc = defaultdict(list) for index, rows in df_desal.iterrows(): inp = make_df( "input", technology=rows["tec"], value=rows["electricity_input_mid"], unit="-", level="final", commodity="electr", mode="M1", time_origin="year", node_loc=df_node["node"], node_origin=df_node["region"], ).pipe( broadcast, map_yv_ya_lt(year_wat, rows["lifetime_mid"], first_year), time=pd.Series(sub_time), ) result_dc["input"].append(inp) results_new = {par_name: pd.concat(dfs) for par_name, dfs in result_dc.items()} inp_df = results_new["input"] # Adding input dataframe df_heat = df_desal[df_desal["heat_input_mid"] > 0] result_dc = defaultdict(list) for index, rows in df_heat.iterrows(): inp = make_df( "input", technology=rows["tec"], value=rows["heat_input_mid"], unit="-", level="final", commodity="d_heat", mode="M1", time_origin="year", node_loc=df_node["node"], node_origin=df_node["region"], ).pipe( broadcast, map_yv_ya_lt(year_wat, rows["lifetime_mid"], first_year), time=pd.Series(sub_time), ) result_dc["input"].append(inp) results_new = {par_name: pd.concat(dfs) for par_name, dfs in result_dc.items()} inp_df = pd.concat([inp_df, results_new["input"]]) # Adding input dataframe for index, rows in df_desal.iterrows(): inp_df = pd.concat( [ inp_df, ( make_df( "input", technology=rows["tec"], value=1, unit="-", level=rows["inlvl"], commodity=rows["incmd"], mode="M1", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["lifetime_mid"], first_year), node_loc=df_node["node"], time=pd.Series(sub_time), ) .pipe(same_node) .pipe(same_time) ), ] ) inp_df.dropna(inplace=True) results["input"] = inp_df out_df = pd.concat( [ out_df, ( make_df( "output", technology=rows["tec"], value=1, unit="-", level=rows["outlvl"], commodity=rows["outcmd"], mode="M1", ) .pipe( broadcast, map_yv_ya_lt(year_wat, rows["lifetime_mid"], first_year), node_loc=df_node["node"], time=pd.Series(sub_time), ) .pipe(same_node) .pipe(same_time) ), ] ) results["output"] = out_df # putting a lower bound on desalination tecs based on hist capacities df_bound = df_hist[df_hist["year"] == 2015] bound_lo = make_df( "bound_activity_lo", node_loc="B" + df_bound["BCU_name"], technology=df_bound["tec_type"], mode="M1", value=df_bound["cap_km3_year"], unit="km3/year", ).pipe( broadcast, year_act=year_wat, time=pd.Series(sub_time), ) bound_lo = bound_lo[bound_lo["year_act"] <= 2030] # Divide the histroical capacity by 5 since the existing data is summed over # 5 years and model needs per year bound_lo["value"] = bound_lo["value"] / 5 results["bound_activity_lo"] = bound_lo return results