Source code for message_ix_models.tools.costs.gdp

import logging

import numpy as np
import pandas as pd
from genno import KeySeq

from message_ix_models import Context

from .config import Config

log = logging.getLogger(__name__)


[docs] def process_raw_ssp_data(context: Context, config: Config) -> pd.DataFrame: """Retrieve SSP data as required for :mod:`.tools.costs`. This method uses :class:`.SSPOriginal` and :class:`.SSPUpdate` via :func:`.exo_data.prepare_computer` Returns ------- pandas.DataFrame with the columns: - scenario_version: version of SSP scenario data - scenario: scenario (SSP1-5, LED) - region: region name - year: year of data - total_population: total population aggregated to the regional level - total_gdp: total GDP aggregated to the regional level - gdp_ppp_per_capita: regional GDP per capita in PPP - gdp_ratio_reg_to_reference: ratio of regional GDP per capita to \ reference region's GDP per capita """ from collections import defaultdict import xarray as xr from genno import Computer, Key, Quantity, quote from message_ix_models.project.ssp.data import SSPUpdate # noqa: F401 from message_ix_models.tools.exo_data import prepare_computer # Computer to hold computations c = Computer() # Common dimensions dims = ("n", "y", "scenario") def broadcast_qty(s) -> Quantity: """Return a quantity with a "scenario" dimension with the single label `s`. Multiplying this by any other quantity adds the "scenario" dimension.""" return Quantity(xr.DataArray([1.0], coords={"scenario": [s]})) c.add("LED:scenario", broadcast_qty("LED")) # Keys prepared in the loop keys = defaultdict(list) for n in "12345": # Source/scenario identifier ssp = f"ICONICS:SSP(2024).{n}" # Add a quantity for broadcasting c.add(f"SSP{n}:scenario", broadcast_qty(f"SSP{n}")) # Both population and GDP data for source_kw in ( dict(measure="POP", model="IIASA-WiC POP 2023", name=f"_pop {n}"), dict(measure="GDP", model="OECD ENV-Growth 2023", name=f"_gdp {n}"), ): m = source_kw["measure"].lower() # Add tasks to `c` that retrieve and (partly) process data from the database key, *_ = prepare_computer(context, c, ssp, source_kw, strict=False) # Add a "scenario" dimension for label in [f"SSP{n}"] + (["LED"] if n == "2" else []): keys[m].append(c.add(f"{m} {label}", "mul", key, f"{label}:scenario")) # Concatenate single-scenario data k_pop = Key("pop", dims) c.add(k_pop, "concat", *keys["pop"]) k_gdp = KeySeq("gdp", dims) c.add(k_gdp.base, "concat", *keys["gdp"]) # Further calculations # GDP per capita c.add(k_gdp["cap"], "div", k_gdp.base, k_pop) # Ratio to reference region value c.add( k_gdp["indexed"], "index_to", k_gdp["cap"], quote("n"), quote(config.ref_region) ) def merge(*dfs: pd.DataFrame) -> pd.DataFrame: """Merge data to a single data frame with the expected format.""" return ( pd.concat( [ dfs[0].to_series().rename("total_population"), dfs[1].to_series().rename("total_gdp"), dfs[2].to_series().rename("gdp_ppp_per_capita"), dfs[3].to_series().rename("gdp_ratio_reg_to_reference"), ], axis=1, ) .reset_index() .rename(columns={"n": "region", "y": "year"}) .sort_values(by=["scenario", "region", "year"]) .assign(scenario_version="2023") ) k_result = "data::pandas" c.add(k_result, merge, k_pop, k_gdp.base, k_gdp["cap"], k_gdp["indexed"]) # log.debug(c.describe(k_result)) # DEBUG Show what would be done result = c.get(k_result) # Ensure no NaN values in the ratio column assert not result.gdp_ratio_reg_to_reference.isna().any() return result
[docs] def adjust_cost_ratios_with_gdp(region_diff_df, config: Config): """Calculate adjusted region-differentiated cost ratios. This function takes in a data frame with region-differentiated cost ratios and calculates adjusted region-differentiated cost ratios using GDP per capita data. Parameters ---------- region_diff_df : pandas.DataFrame Output of :func:`apply_regional_differentiation`. config : .Config The function responds to, or passes on to other functions, the fields: :attr:`~.Config.base_year`, :attr:`~.Config.node`, :attr:`~.Config.ref_region`, :attr:`~.Config.scenario`, and :attr:`~.Config.scenario_version`. Returns ------- pandas.DataFrame DataFrame with columns: - scenario_version: scenario version - scenario: SSP scenario - message_technology: message technology - region: R11, R12, or R20 region - year - gdp_ratio_reg_to_reference: ratio of GDP per capita in respective region to GDP per capita in reference region. - reg_cost_ratio_adj: adjusted region-differentiated cost ratio """ from .projections import _maybe_query_scenario, _maybe_query_scenario_version context = Context.get_instance(-1) context.model.regions = config.node # - Retrieve GDP from SSP databases and compute and ratios (per capita; versus # ref_region. # - Keep only data from y₀ onwards. # - Map "scenario_version" strings to the desired output. # - Set the dtype of the "year" column. # - Filter on config.scenario and config.scenario_version, if configured. df_gdp = ( process_raw_ssp_data(context, config) .query("year >= @config.y0") .drop(columns=["total_gdp", "total_population"]) .assign( scenario_version=lambda x: np.where( x.scenario_version.str.contains("2013"), "Previous (2013)", "Review (2023)", ) ) .astype({"year": int}) .pipe(_maybe_query_scenario, config) .pipe(_maybe_query_scenario_version, config) ) # If base year does not exist in GDP data, then use earliest year in GDP data and # give warning base_year = config.base_year if base_year not in df_gdp.year.unique(): new_base_year = min(df_gdp.year.unique()) log.warning(f"Use year={new_base_year} GDP data as proxy for {base_year}") base_year = new_base_year def _constrain_cost_ratio(df: pd.DataFrame, base_year): """Constrain "reg_cost_ratio_adj". In cases where gdp_ratio_reg_to_reference is < 1 and reg_cost_ratio_adj > 1 in the base period, ensure reg_cost_ratio_adj(y) <= reg_cost_ratio_adj(base_year) for all future periods y. """ ref = df.query("year == @base_year").iloc[0] if ref.gdp_ratio_reg_to_reference < 1 and ref.reg_cost_ratio_adj > 1: return df.assign( reg_cost_ratio_adj=df.reg_cost_ratio_adj.clip( upper=ref.reg_cost_ratio_adj ) ) else: return df # 1. Select base-year GDP data for "gdp_ratio_reg_to_reference". # 2. Drop "year". # 3. Merge `df_region_diff` for "reg_cost_ratio". # 4. Compute slope. # 5. Compute intercept. # 6. Drop "gdp_ratio_reg_to_reference"—because of (1–2), this is the base period # value only. # 7. Merge `df_gdp` again to re-adds "year" and "gdp_ratio_reg_to_reference" with # distinct values for each period. # 8. Compute ref_cost_ratio_adj # 9. Fill 1.0 where NaNs occur in (8), i.e. for the reference region. # 10. Group by (sv, s, r, t) and apply _constrain_cost_ratio(), above, to each # group. # 11. Select the desired columns. return ( df_gdp.query("year == @base_year") .drop("year", axis=1) .merge(region_diff_df, on=["region"]) .eval("slope = (reg_cost_ratio - 1) / (gdp_ratio_reg_to_reference - 1)") .eval("intercept = 1 - slope") .drop("gdp_ratio_reg_to_reference", axis=1) .merge(df_gdp, on=["scenario_version", "scenario", "region"], how="right") .eval("reg_cost_ratio_adj = slope * gdp_ratio_reg_to_reference + intercept") .fillna({"reg_cost_ratio_adj": 1.0}) .groupby( ["scenario_version", "scenario", "region", "message_technology"], group_keys=False, ) .apply(_constrain_cost_ratio, base_year)[ [ "scenario_version", "scenario", "message_technology", "region", "year", "gdp_ratio_reg_to_reference", "reg_cost_ratio_adj", ] ] )