Source code for message_ix_models.model.buildings.build

"""Build MESSAGEix-Buildings on a MESSAGEix-GLOBIOM scenario."""

import logging
import re
from collections import defaultdict
from collections.abc import Iterable, Mapping, MutableMapping, Sequence
from copy import deepcopy
from itertools import product
from typing import TYPE_CHECKING, cast

import message_ix
import pandas as pd
from genno import Quantity
from genno.operator import mul, relabel, rename_dims
from message_ix import make_df

try:
    from ixmp.report.operator import data_for_quantity
    from message_ix.report.operator import as_message_df
except ImportError:  # ixmp/message_ix v3.7.0
    from ixmp.reporting.computations import (  # type: ignore [import-not-found,no-redef]
        data_for_quantity,
    )
    from message_ix.reporting.computations import (  # type: ignore [import-not-found,no-redef]
        as_message_df,
    )
from sdmx.model.v21 import Annotation, Code

from message_ix_models import Context, ScenarioInfo, Spec
from message_ix_models.model import build
from message_ix_models.model.bmt.utils import subtract_material_demand
from message_ix_models.model.structure import (
    generate_set_elements,
    get_codes,
    get_region_codes,
)
from message_ix_models.util import (
    broadcast,
    load_package_data,
    make_io,
    merge_data,
    nodes_ex_world,
    private_data_path,
)

from .rc_afofi import get_afofi_commodity_shares, get_afofi_technology_shares

# from message_data.projects.ngfs.util import add_macro_COVID  # Unused

if TYPE_CHECKING:
    from message_ix import Scenario, make_df

    from message_ix_models.types import MutableParameterData, ParameterData

log = logging.getLogger(__name__)

#: STURM commodity names to be converted for use in MESSAGEix-Materials; see
#: :func:`materials`.
BUILD_COMM_CONVERT = [
    "resid_mat_int_scrap_steel",
    "resid_mat_int_scrap_aluminum",
    "resid_mat_int_scrap_cement",
    "resid_mat_int_demand_steel",
    "resid_mat_int_demand_aluminum",
    "resid_mat_int_demand_cement",
    "comm_mat_int_scrap_steel",
    "comm_mat_int_scrap_aluminum",
    "comm_mat_int_scrap_cement",
    "comm_mat_int_demand_steel",
    "comm_mat_int_demand_aluminum",
    "comm_mat_int_demand_cement",
]

#: Types of materials.
#:
#: .. todo:: Move to and read from :file:`data/buildings/set.yaml`.
MATERIALS = ["steel", "cement", "aluminum"]


[docs] def adapt_emission_factors(data: "MutableParameterData") -> None: """Adapt ``relation_activity`` values in `data` that represent emission factors. In MESSAGEix-GLOBIOM, ``relation_activity`` entries for, for instance, r=CO_Emission are computed as (emission factor for fuel, species) × (input efficiency of technology consuming the fuel). Because the MESSAGE-Buildings representation sets the latter to 1.0, the relation_activity entries must be recomputed. This function updates the values in :py:`data["relation_activity"]`, assuming that :py:`data["input"]` contains the *original* (base model, MESSAGEix-GLOBIOM) input efficiencies. Then it sets :py:`data["input"]["value"]` to 1.0. .. todo:: When available in :mod:`message_ix_models`, simply read the values for each (fuel, species) from a file, rather than performing this calculation. """ def assert_value_unique(dfgb): """Ensure that each group of `dfgb` contains only 1 unique "value".""" assert (1 == dfgb.nunique()["value"]).all() return dfgb # Common dimensions of "relation_activity" and "input", to merge on cols = ["node_loc", "technology", "year_act", "mode"] # Relations to omit from calculation omit = ["HFC_emission", "HFC_foam_red"] # - Group "input" by `cols`. # - Take the first value in each group; given all values are the same within groups. # - Rename "value" to "input" (avoiding clash with "value" in relation_activity). # - Drop columns not present in relation_activity. input_ = ( data["input"] .groupby(cols) .pipe(assert_value_unique) .nth(0) .rename(columns={"value": "input"}) .drop( "year_vtg node_origin commodity level time time_origin unit".split(), axis=1 ) ) # - Omit certain relations. # - Merge `input_` into "relation_activity" data to add an "input" column. # - Divide by base-model input efficiency to recover emissions factors per fuel. # - Drop "input" column. ra = "relation_activity" data[ra] = cast( pd.DataFrame, data[ra][~data[ra].relation.isin(omit)] .merge(input_, how="left", on=cols) .astype({"year_rel": int}) .eval("value = value / input"), ).drop("input", axis=1) # Set input efficiencies to 1.0 per MESSAGE-Buildings representation data["input"] = data["input"].assign(value=1.0)
[docs] def get_spec(context: Context, filter_relations: list[str] = []) -> Spec: """Return the specification for MESSAGEix-Buildings. Parameters ---------- context : .Context The key ``regions`` determines the regional aggregation used. filter_relations, optional if given, only include these relations in :attr:`Spec.require`. .. todo:: Expand to handle :data:`BUILD_COMM_CONVERT`. """ load_config(context) s = deepcopy(context["buildings spec"]) # Use existing context.buildings if set (e.g. by BMT workflow); else default if getattr(context, "buildings", None) is None: from types import SimpleNamespace context.buildings = SimpleNamespace(with_materials=False) if context.buildings.with_materials: s.require.set["commodity"].extend(MATERIALS) # commented: See docstring of bio_backstop and comments in prepare_data, below # s.add.set["technology"].append(Code(id="bio_backstop")) # The set of required nodes varies according to context.regions s.require.set["node"].extend(map(str, get_region_codes(context.regions))) if filter_relations: s.require.set["relation"] = list( filter( lambda r: getattr(r, "id", r) in filter_relations, s.require.set["relation"], ) ) return s
[docs] def get_prices(s: message_ix.Scenario) -> pd.DataFrame: """Retrieve PRICE_COMMODITY for certain quantities; excluding _GLB node.""" result = s.var( "PRICE_COMMODITY", filters={ "level": "final", "commodity": ["biomass", "coal", "lightoil", "gas", "electr", "d_heat"], }, ) return result[~result["node"].str.endswith("_GLB")]
[docs] def get_techs(spec: Spec, commodity=None) -> list[str]: """Return a list of buildings technologies.""" codes: Iterable[Code] = spec.add.set["technology"] if commodity: codes = filter(lambda s: s.id.startswith(commodity), codes) return sorted(map(str, codes))
[docs] def get_tech_groups( spec: Spec, include="commodity enduse", legacy=False ) -> Mapping[str, Sequence[str]]: """Return groups of buildings technologies from `spec`. These are suitable for aggregation, e.g. in data preparation or reporting. Parameters ---------- spec The result of :func:`get_spec`. include : str or sequence of str May include specific values to control what is returned: - "commodity": include keys like "resid gas", where "gas" is a commodity, collecting technologies which consume this commodity. - "enduse": include keys like "comm other_uses", where "other_uses" is a buildings energy end-use, collecting technologies which represent this end-use. legacy if :data:`True`, apply mapping from commodity names to labels used in legacy reporting code; e.g. "electr" becomes "elec". """ if legacy: try: # FIXME This COMMODITY dictionary is not present in the version of the # legacy reporting migrated to message_ix_models. It, or this code, # must be updated in order to be usable. from message_ix_models.report.legacy.default_tables import ( # type: ignore [attr-defined] COMMODITY, ) except ImportError: COMMODITY = dict() else: COMMODITY = dict() # Results techs = defaultdict(list) # Expression to match technology IDs generated per buildings/set.yaml # - The 'c' (commodity) group matches only the "lightoil" in "lightoil_lg" expr = re.compile( "^(?P<c>.*?)(_lg)?_((?P<sector>comm|resid)_(?P<enduse>.*)|afofi)$" ) def _store(value, c, e, s): """Update 1 or more lists in `techs` with `value`.""" techs[s].append(value) if "commodity" in include: techs[f"{s} {c}"].append(value) if "enduse" in include and e: techs[f"{s} {e}"].append(value) # Iterate over technologies for t in spec.add.set["technology"]: # Extract commodity, end-use, and sector from `expr` match = expr.match(t.id) if match is None: continue sector = match.group("sector") or "afofi" commodity, enduse = match.group("c", "enduse") # For sector=AFOFI, enduse=None # Omit technologies for the buildings-materials linkage if commodity in {"construction", "demolition"}: continue # For some base model technologies, e.g. `hp_el_rc`, thus for `hp_el_afofi`, the # ID does not contain the ID of the input commodity. Look up the actual input # commodity from annotations in technology.yaml. try: commodity = t.eval_annotation("input")[0] except TypeError: pass # No such annotation # Maybe map to labels used in legacy reporting commodity = COMMODITY.get(commodity, commodity) # Update lists _store(match.string, commodity, enduse, sector) # Also update "rc" totals _store(match.string, commodity, enduse, "rc") return techs
[docs] def load_config(context: Context) -> None: """Load MESSAGEix-Buildings configuration from file and store on `context`. Model structure information is loaded from :file:`data/buildings/set.yaml` and derived from the base model structures. This function does most of the work for :func:`get_spec` (the parts that do not vary vary according to :class:`.buildings.Config`) and stores the result as the :class:`Context` key "buildings spec". """ if "buildings spec" in context: return set_info = cast("MutableMapping", load_package_data("buildings", "set.yaml")) # Generate set elements from a product of others for set_name, info in set_info.items(): generate_set_elements(set_info, set_name) # Currently unused, and generates issues when caching functions where context is an # argument set_info["technology"].pop("indexers", None) # Create a spec s = Spec() for set_name, info in set_info.items(): # Elements to add, remove, and require for action in ("add", "remove", "require"): s[action].set[set_name].extend(info.get(action, [])) # Generate commodities that replace corresponding rc_* in the base model for c in filter(lambda x: x.id.startswith("rc_"), get_codes("commodity")): new = deepcopy(c) new.id = c.id.replace("rc_", "afofio_") s.add.set["commodity"].append(new) # Generate technologies that replace corresponding *_rc|RC in the base model # Match both _RC/_rc at end and _RC_RT/_rc_RT patterns expr = re.compile("_(rc|RC)(_RT)?$") # Technologies that should not be transformed to afofi exclude_techs: dict[str, str] = {} for t in filter(lambda x: expr.search(x.id), get_codes("technology")): # Skip technologies that should not be transformed if t.id in exclude_techs: continue # Generate a new Code object, preserving annotations new = deepcopy(t) # Replace _RC or _rc with _afofio, preserving _RT suffix if present # e.g., sp_el_RC_RT -> sp_el_afofio_RT, loil_rc -> loil_afofio if t.id.endswith("_RT"): # Replace _RC_RT or _rc_RT with _afofio_RT new.id = re.sub("_(rc|RC)_RT$", "_afofio_RT", t.id) else: # Replace _RC or _rc at end with _afofio new.id = re.sub("_(rc|RC)$", "_afofio", t.id) new.annotations.append(Annotation(id="derived-from", text=t.id)) # This will be added s.add.set["technology"].append(new) # The original technology will be removed s.remove.set["technology"].append(t) # Store context["buildings spec"] = s
# def merge_data( # base: MutableMapping[str, pd.DataFrame], *others: Mapping[str, pd.DataFrame] # ) -> None: # import message_ix_models.util # message_ix_models.util.merge_data(base, *others)
[docs] def bio_backstop(scen: "Scenario", nodes=["R12_AFR", "R12_SAS"]) -> "ParameterData": """Create a backstop supply of (biomass, primary) to avoid infeasibility. This is not currently in use; see comments in :func:`prepare_data`. This function simplified from a version in the MESSAGE_Buildings/util/ directory, itself modified from an old/outdated (before 2022-03) version of :mod:`.add_globiom`. See https://iiasa-ece.slack.com/archives/C03M5NX9X0D/p1659623091532079 for discussion. """ # Retrieve technology for which will be used to create the backstop filters = dict(technology="elec_rc", node_loc="R12_NAM", year_act=2020) data = defaultdict(list) for node, name in product(nodes, ["output", "var_cost"]): values = dict(technology="bio_backstop", node_loc=node) if name == "output": values.update(commodity="biomass", node_dest=node, level="primary") elif name == "var_cost": # 2023-07-24 PNK: reduced from 1e5 to address MACRO calibration issues values.update(value=1e1) data[name].append(scen.par(name, filters=filters).assign(**values)) result: "ParameterData" = {k: pd.concat(v) for k, v in data.items()} log.debug(repr(result)) return result
[docs] def scale_and_replace( scenario: "Scenario", replace: dict, q_scale: Quantity, relations: list[str], relax: float = 0.0, ) -> Mapping[str, pd.DataFrame]: """Return scaled parameter data for certain technologies. The function acts on the list of parameters below. - For some parameters (scale is None), data are copied. - For other parameters, data are scaled by multiplication with `q_scale`. - For parameters with a relative sense, e.g. ``growth_activity_lo``, no further scaling is applied. - For parameters with an absolute sense, e.g. ``bound_activity_lo``, values are additionally scaled by a “relaxation” factor of (1 + `relax`) for upper bounds or (1 - `relax`) for lower bounds. Setting `relax` to 0 (the default) disables this behaviour. These operations are applied to all data for which the ``technology`` IDs appears in ``replace["technology"]``. Finally, ``replace`` is applied to optionally replace technology IDs or IDs for other dimensions. Returns ------- dict of (str -> .DataFrame) Keys are parameter names; """ # Filters for retrieving data f_long = dict(filters={"technology": list(replace["technology"].keys())}) f_short = dict(filters={"t": list(replace["technology"].keys())}) dims = dict(mode="m", node_loc="nl", technology="t", time="h", year_act="ya") # Use "nl" on scaling quantity to align with parameters modified _q_scale = rename_dims(q_scale, {"n": "nl"}) if "n" in q_scale.dims else q_scale # Copy data for certain parameters with renamed technology & commodity result = dict() for name, scale in ( ("capacity_factor", None), ("emission_factor", None), ("fix_cost", None), ("input", None), ("inv_cost", None), ("output", None), ("relation_activity", None), ("technical_lifetime", None), ("var_cost", None), # Historical ("historical_activity", 1.0), # Constraints ("growth_activity_lo", None), ("growth_activity_up", None), ("bound_activity_lo", 1 - relax), ("bound_activity_up", 1 + relax), ("bound_new_capacity_lo", 1 - relax), ("bound_new_capacity_up", 1 + relax), ("bound_total_capacity_lo", 1 - relax), ("bound_total_capacity_up", 1 + relax), ("growth_activity_lo", None), ("growth_activity_up", None), ("initial_activity_lo", 1 - relax), ("initial_activity_up", 1 + relax), ("soft_activity_lo", None), ("soft_activity_up", None), ("growth_new_capacity_lo", None), ("growth_new_capacity_up", None), ("initial_new_capacity_lo", 1 - relax), ("initial_new_capacity_up", 1 + relax), ("soft_new_capacity_lo", None), ("soft_new_capacity_up", None), ): if scale is None: # Prepare filters _f = deepcopy(f_long) if name == "relation_activity": # Only copy relation_activity data for certain relations _f["filters"].update(relation=relations) df = scenario.par(name, **_f) else: # - Retrieve data as a genno.quantity. # - Multiply by scaling factors. q = ( data_for_quantity("par", name, "value", scenario, config=f_short) * _q_scale * Quantity(scale) ) # Convert back to message_ix data frame. as_message_df() returns dict -> # (str -> pd.DataFrame), so pop the single value df = as_message_df(q, name, dims, {}).pop(name) if not len(df): continue result[name] = df.replace(replace) # DEBUG # if name in ( # "historical_activity", # "output", # ): # print(name) # print(result[name].to_string()) log.info(f"Data for {len(result)} parameters") return result
[docs] def prepare_data_A( scenario: message_ix.Scenario, info: ScenarioInfo, demand: pd.DataFrame, prices: pd.DataFrame, sturm_r: pd.DataFrame, sturm_c: pd.DataFrame, with_materials: bool, relations: list[str], afofi_demand: pd.DataFrame | None = None, ) -> "ParameterData": """Derive data for MESSAGEix-Buildings from `scenario`.""" # Data frames for each parameter result: "MutableParameterData" = dict() # Mapping from original to generated commodity names c_map = {f"rc_{name}": f"afofi_{name}" for name in ("spec", "therm")} # Retrieve shares of AFOFI within rc_spec or rc_therm; dimensions (c, n). These # values are based on 2010 and 2015 data; see the code for details. c_share = get_afofi_commodity_shares() # Retrieve existing demands filters: dict[str, Iterable] = dict(c=["rc_spec", "rc_therm"], y=info.Y) afofi_dd = data_for_quantity( "par", "demand", "value", scenario, config=dict(filters=filters) ) # On a second pass (after main() has already run once), rc_spec and rc_therm have # been stripped out, so `afofi_dd` is empty; skip manipulating it. if len(afofi_dd): # - Compute a share (c, n) of rc_* demand (c, n, …) = afofi_* demand # - Relabel commodities. tmp = relabel(mul(afofi_dd, c_share), {"c": c_map}) # Convert back to a MESSAGE data frame dims = dict(commodity="c", node="n", level="l", year="y", time="h") # TODO Remove typing exclusion once message_ix is updated for genno 1.25 result.update(as_message_df(tmp, "demand", dims, {})) # type: ignore [arg-type] # Copy technology parameter values from rc_spec and rc_therm to new afofi. # Again, once rc_(spec|therm) are stripped, .par() returns nothing here, so # rc_techs is empty and the following loop does not run # Identify technologies that output to rc_spec or rc_therm rc_techs = scenario.par( "output", filters={"commodity": ["rc_spec", "rc_therm"]} )["technology"].unique() # Mapping from source to generated names for scale_and_replace replace = { "commodity": c_map, "technology": {t: re.sub("(rc|RC)", "afofi", t) for t in rc_techs}, } # Compute shares with dimensions (t, n) for scaling parameter data t_shares = get_afofi_technology_shares(c_share, replace["technology"].keys()) merge_data( result, # TODO Remove exclusion once message-ix-models >2025.1.10 is released scale_and_replace( # type: ignore [arg-type] scenario, replace, t_shares, relations=relations, relax=0.05 ), ) # Create new technologies for building energy # Mapping from commodity to base model's *_rc technology rc_tech_fuel = {"lightoil": "loil_rc", "electr": "elec_rc", "d_heat": "heat_rc"} data = defaultdict(list) # Add for fuels above for fuel in prices["commodity"].unique(): # Find the original rc technology for the fuel tech_orig = rc_tech_fuel.get(fuel, f"{fuel}_rc") # Create the technologies for the new commodities for commodity in filter( re.compile(f"[_-]{fuel}").search, demand["commodity"].unique() ): # Fix for lightoil gas included if "lightoil-gas" in commodity: tech_new = f"{fuel}_lg_" + commodity.replace("_lightoil-gas", "") else: tech_new = f"{fuel}_" + commodity.replace(f"_{fuel}", "") # Modify data for name, filters, extra in ( # type: ignore ("input", {}, {}), # NB value=1.0 is done by adapt_emission_factors() ("output", {}, dict(commodity=commodity, value=1.0)), ("capacity_factor", {}, {}), ("emission_factor", {}, {}), ("relation_activity", dict(relation=relations), {}), ): filters["technology"] = [tech_orig] data[name].append( scenario.par(name, filters=filters).assign( technology=tech_new, **extra ) ) # - Concatenate data frames together. # - Adapt relation_activity values that represent emission factors. # - Merge to results. tmp = {k: pd.concat(v) for k, v in data.items()} adapt_emission_factors(tmp) merge_data(result, tmp) log.info( "Prepared:\n" + "\n".join(f"{len(v)} obs for {k!r}" for k, v in result.items()) ) if with_materials: # Set up buildings-materials linkage merge_data(result, materials(scenario, info, sturm_r, sturm_c)) # commented: This is superseded by .navigate.workflow.add_globiom_step # # Add data for a backstop supply of (biomass, secondary) # merge_data(result, bio_backstop(scenario)) return result
[docs] def prepare_data_B( # noqa: C901 scenario: message_ix.Scenario, info: ScenarioInfo, prices: pd.DataFrame, sturm_r: pd.DataFrame, sturm_c: pd.DataFrame, demand_static: pd.DataFrame, with_materials: bool, relations: list[str], ) -> "ParameterData": """Derive data for MESSAGEix-Buildings from `scenario`. Function-wise same as prepare_data(). Input data: - MESSAGE-format report (demand_resid, demand_comm) and demand_static Buildings demand includes: - A: Resid and Comm cool/heat/hotwater demand (from STURM, with price iteration) - B: Resid app/cook demand (from ACCESS, no iteration) - C: Resid non-commecial biomass demand (from ACCESS, no iteration) - D: Resid and Comm material demand (from STURM, no iteration) - E: Residual AFOFIO demand (external) """ # Data frames for each parameter result: "MutableParameterData" = dict() # Reset index for df in [sturm_r, sturm_c, demand_static]: if df is not None and "node" not in df.columns: df.reset_index(inplace=True) # Add 2110 data by copying from 2100 if missing for df_name, df in [ ("sturm_r", sturm_r), ("sturm_c", sturm_c), ("demand_static", demand_static), ]: if df is not None and "year" in df.columns: if 2110 not in df["year"].values and 2100 in df["year"].values: # Copy 2100 data to 2110 df_2100 = df[df["year"] == 2100].copy() df_2100["year"] = 2110 # Update the original dataframe if df_name == "sturm_r": sturm_r = pd.concat([sturm_r, df_2100], ignore_index=True) elif df_name == "sturm_c": sturm_c = pd.concat([sturm_c, df_2100], ignore_index=True) elif df_name == "demand_static": demand_static = pd.concat( [demand_static, df_2100], ignore_index=True ) log.info(f"Added 2110 data by copying from 2100 for {df_name}") # Step 1: new techs/commodities for Buildings demands (part A, B) # Prepare demand data commodity_info = cast( "MutableMapping", load_package_data("buildings", "commodity.yaml") ) buildings_commodities = set(commodity_info.keys()) # TODO: another way is to use the add in set.yaml demand = pd.concat([sturm_r, sturm_c, demand_static], ignore_index=True) demand = demand[demand["commodity"].isin(buildings_commodities)] result["demand"] = demand # Quit building if the scenario already has Buildings demands try: existing_commodities = set(scenario.par("demand")["commodity"].unique()) if existing_commodities & buildings_commodities: log.info( "Scenario already has Buildings demands. " "Skipping technology generation." ) return result except (KeyError, ValueError): pass # Mapping from commodity to base model's *_rc technology rc_tech_fuel = {"lightoil": "loil_rc", "electr": "elec_rc", "d_heat": "heat_rc"} data = defaultdict(list) # Generate input, output, capacity_factor, emission_factor, # relation_activity for new technologies # Deal with 2 exceptions: # - Rooftop technologies for input # - Lightoil gas for fuel in prices["commodity"].unique(): # Find the original rc technology for the fuel tech_orig = rc_tech_fuel.get(fuel, f"{fuel}_rc") # Create the technologies for the new commodities for commodity in filter( re.compile(f"[_-]{fuel}").search, demand["commodity"].unique() ): # Fix for lightoil gas included if "lightoil-gas" in commodity: tech_new = f"{fuel}_lg_" + commodity.replace("_lightoil-gas", "") else: tech_new = f"{fuel}_" + commodity.replace(f"_{fuel}", "") log.info(f" Commodity: {commodity} -> Tech: {tech_new}") # Modify data for name, filters, extra in ( # type: ignore ("input", {}, {}), # NB value=1.0 is done by adapt_emission_factors() ("output", {}, dict(commodity=commodity, value=1.0)), ("capacity_factor", {}, {}), ("emission_factor", {}, {}), ("relation_activity", dict(relation=relations), {}), ): filters["technology"] = [tech_orig] input_data = scenario.par(name, filters=filters).assign( technology=tech_new, **extra ) data[name].append(input_data) # Deal with rooftop technologies for input # All newly created electricity technologies that can use rooftop PV # should have: # - M1: electr at level "final" (already exists) # - M2: electr at level "final_RT" (add this) rt_tech = [ "electr_comm_cool", "electr_resid_cool", "electr_resid_apps", "electr_resid_other_uses", "electr_comm_other_uses", "electr_resid_cook", # not hotwater and heating (need corresponding storage technologies) ] if name == "input" and len(input_data) > 0: electr_inputs = input_data[ (input_data["technology"].isin(rt_tech)) & (input_data["commodity"] == "electr") & (input_data["level"] == "final") & (input_data["mode"] == "M1") ].copy() if len(electr_inputs) > 0: # Create M2 inputs with level "final_RT" electr_inputs_m2 = electr_inputs.assign( level="final_RT", mode="M2" ) data[name].append(electr_inputs_m2) log.info( f"Added {len(electr_inputs_m2)} M2 input rows " f"for technology {tech_new}" ) elif name == "output" and len(input_data) > 0: electr_outputs = input_data[ (input_data["technology"].isin(rt_tech)) & (input_data["mode"] == "M1") ].copy() if len(electr_outputs) > 0: electr_outputs_m2 = electr_outputs.assign(mode="M2") data[name].append(electr_outputs_m2) log.info( f"Added {len(electr_outputs_m2)} M2 output rows " f"for technology {tech_new}" ) tmp = {k: pd.concat(v) for k, v in data.items()} adapt_emission_factors(tmp) merge_data(result, tmp) # Step 2: generate new technologies and commodities for Buildings demands (part C) try: existing_commodities = set(scenario.par("demand")["commodity"].unique()) if "non-comm" not in existing_commodities: log.info("Scenario does not have 'non-comm' demand.") # TODO: add the chain to build biomass_nc technologies too # TODO: not clear about the logic of keeping which version of non-comm return result except (KeyError, ValueError): pass # Step 3: generate new technologies and commodities for the residual rc (part E) # - replace rc_spec and rc_therm with afofio_spec and afofio_therm # - AFOFIO demand read from CSV files afofio_demand = demand_static[ demand_static["commodity"].isin(["afofio_spec", "afofio_therm"]) ] result["demand"] = pd.concat([result["demand"], afofio_demand], ignore_index=True) # Mapping from original to generated commodity names c_map = {f"rc_{name}": f"afofio_{name}" for name in ("spec", "therm")} # Create AFOFIO technologies by transforming RC technologies # Identify technologies that output to rc_spec or rc_therm rc_techs = scenario.par("output", filters={"commodity": ["rc_spec", "rc_therm"]})[ "technology" ].unique() def transform_tech_name(tech_name: str) -> str: if tech_name.endswith("_RT"): # Handle _RC_RT or _rc_RT -> _afofio_RT return re.sub("_(rc|RC)_RT$", "_afofio_RT", tech_name) else: # Handle _RC or _rc at end -> _afofio return re.sub("_(rc|RC)$", "_afofio", tech_name) replace = { "commodity": c_map, "technology": {t: transform_tech_name(t) for t in rc_techs}, } t_shares = Quantity(1.0, name="afofio tech share") # 1.0 scaling as actual demand data read in merge_data( result, scale_and_replace( # type: ignore [arg-type] scenario, replace, t_shares, relations=relations, relax=0.05 ), ) # Step 4: build materials for new constructions and demolitions (part D) # May need iteration later for projects deepdive into material flows if with_materials: # Set up buildings-materials linkage merge_data(result, materials(scenario, info, sturm_r, sturm_c)) # Step 5: other format check and adjustments for key, df in result.items(): # convert year columns in all DataFrames in result dict year_cols = [col for col in df.columns if col.startswith("year")] if year_cols: result[key] = df.astype({col: int for col in year_cols}) def _level(c): return ( "demand" if ("floor" in c or any(mat in c.lower() for mat in MATERIALS)) else "useful" ) result["demand"] = result["demand"].assign( level=result["demand"].commodity.apply(_level) ) return result
[docs] def prune_spec(spec: Spec, data: "ParameterData") -> None: """Remove extraneous entries from `spec`.""" for name in ("commodity", "technology"): values = set(data["input"][name]) | set(data["output"][name]) # DEBUG # missing = map( # lambda c: c.id, filter(lambda c: c.id not in values, spec.add.set[name]) # ) # print("\n".join(sorted(missing))) N = len(spec.add.set[name]) spec.add.set[name] = sorted( filter(lambda c: c.id in values, spec.add.set[name]) ) log.info(f"Prune {N - len(spec.add.set[name])} {name} codes with no data") missing = values - set(spec.add.set[name]) - set(spec.require.set[name]) if len(missing): log.warning( f"Missing {len(missing)} values:\n" + "\n".join(sorted(missing)) )
[docs] def main(context: Context, scenario: message_ix.Scenario, *args: pd.DataFrame) -> None: """Set up the structure and data for MESSAGE_Buildings on `scenario`. The function responds to a :class:`.buildings.Config` instance at :py:`context.buildings`; if none exists, it is created using :func:`.bmt.config.load_buildings_config`; see its documentation. If :attr:`.buildings.Config.method` is :any:`~.buildings.config.METHOD.A`: - `args` must contain four (4) data frames; these are data for `demand`, `prices`, `sturm_r`, and `sturm_c`. - Model data are prepared using :func:`prepare_data_A`. If :attr:`.buildings.Config.method` is :any:`~.buildings.config.METHOD.B`: - :func:`get_spec` is called with the :py:`filter_relations` parameter, using only the relation set members already present on `scenario`. - `args` must be empty. - Data for demand, prices, and STURM are loaded from CSV files given by :attr:`.buildings.Config.data_paths`. - Model data are prepared using :func:`prepare_data_B` - The functions :func:`_remove_rc_bounds` and :func:`_replace_ue_rt_share_with_share_mode` are called on `scenario`. Parameters ---------- context :py:`context.buildings` may be an instance of :class:`.buildings.Config`. scenario Scenario to set up. """ from .config import METHOD # Ensure context.buildings is set (e.g. by BMT from config.yaml); else defaults if "buildings" not in context: from message_ix_models.model.bmt.config import load_buildings_config # Same as in .model.bmt.workflow.generate() context.buildings = load_buildings_config() # Info about the `scenario` to be modified. If build.main() has already been run on # the scenario, this will reflect that, e.g. will include the structures from # buildings/set.yaml. info = ScenarioInfo(scenario) scenario.check_out() try: # TODO explain what this is for scenario.init_set("time_relative") except ValueError: pass # Already exists # Generate a spec for the model. # For METHOD.B, restrict required relations to those present on the scenario. spec = get_spec( context, filter_relations=scenario.set("relation").tolist() if context.buildings.method is METHOD.B else [], ) if context.buildings.method is METHOD.A: assert 4 == len(args) demand, prices, sturm_r, sturm_c = args # Prepare data based on the contents of `scenario` data = prepare_data_A( scenario, info, demand, prices, sturm_r, sturm_c, context.buildings.with_materials, relations=spec.require.set["relation"], afofi_demand=None, # Use calculated AFOFI demand ) elif context.buildings.method is METHOD.B: from pathlib import Path def _load_csv(attr: str, index_col=None): """Resolve path from context.buildings or defaults and load CSV.""" val = context.buildings.data_paths[attr] path = Path(val) # TODO Move this path logic into .buildings.Config path = path if path.is_absolute() else private_data_path("buildings", val) return pd.read_csv(path, index_col=index_col) # Inputs for prepare_data_B from context.buildings or defaults prices = _load_csv("prices") sturm_r = _load_csv("sturm_r", index_col=0) sturm_c = _load_csv("sturm_c", index_col=0) demand_static = _load_csv("demand_static", index_col=0) demand_static.loc[ demand_static["commodity"].str.contains("afofio", na=False), "value" ] = 0 # Temporary fix to remove AFOFIO demand from demand_static # Prepare data based on the contents of `scenario` data = prepare_data_B( scenario, info, prices, sturm_r, sturm_c, demand_static, context.buildings.with_materials, relations=spec.require.set["relation"], ) # Remove unused commodities and technologies prune_spec(spec, data) # Simple callback for apply_spec() def _add_data(s, **kw): return data # FIXME check whether this works correctly on the re-solve of a scenario that has # already been set up options = dict(fast=True) build.apply_spec(scenario, spec, _add_data, **options) if context.buildings.method is METHOD.B: _remove_rc_bounds(scenario) _replace_ue_rt_share_with_share_mode(scenario) scenario.set_as_default() log.info(f"Built {scenario.url} and set as default")
[docs] def materials( scenario: message_ix.Scenario, info: ScenarioInfo, sturm_r: pd.DataFrame, sturm_c: pd.DataFrame, ) -> "ParameterData": """Integrate MESSAGEix-Buildings with MESSAGEix-Materials. This function prepares data for `scenario` to work with :mod:`.model.material`. Structural changes (addition/removal of technologies and commodities) are handled by :func:`get_spec` and :func:`main`. The data is for the "output", "input", and "demand" MESSAGE parameters. It includes: 1. For new technologies like ``(construction|demolition)_(resid|comm)_build``: - For ``construction_*`` technologies, input of the commodities steel, aluminum, and cement (cf :data:`BUILD_COMM_CONVERT`) from ``l="product"``, and output to ``c="(comm|resid)_floor_construction, l="demand"``. - For the ``demolition_*`` technologies, no input, but output to both ``c="(comm|resid)_floor_demolition, l="demand"`` *and* commodities (same 3) at ``l="end_of_life"``. 2. Adjusted values for existing "demand" parameter data at ``l="demand"`` for steel, aluminum, and cement by subtracting the amounts from ``sturm_r`` and ``sturm_c``. The demands are not reduced below zero. """ # Accumulate a list of data frames for each parameter result = defaultdict(list) # Create new input/output for building material intensities common = dict( time="year", time_origin="year", time_dest="year", mode="M1", year_vtg=info.Y, year_act=info.Y, ) # Iterate over `BUILD_COMM_CONVERT` and nodes (excluding World and *_GLB) # NB probably could vectorize over `n`. for c, n in product(BUILD_COMM_CONVERT, nodes_ex_world(info.N)): rc, *_, typ, comm = c.split("_") # First, second-to-last, and last entries common.update(node_loc=n, node_origin=n, node_dest=n) # Select data for (rc, c, n) df_mat = (sturm_r if rc == "resid" else sturm_c).query( f"commodity == '{c}' and node == '{n}'" ) # Handle missing years: if 2020 missing use 2025; if 2110 missing use 2100 for target, source in [(2020, 2025), (2110, 2100)]: if target not in df_mat["year"].values: df_source = df_mat[df_mat["year"] == source] if len(df_source) > 0: df_target = df_source.copy() df_target["year"] = target df_mat = pd.concat( [df_mat, df_target], ignore_index=True ).sort_values("year") eff = pd.concat([df_mat.value, df_mat.value.tail(1)]).iloc[-len(info.Y) :] if typ == "demand": data = make_io( (comm, "product", "t"), (f"{rc}_floor_construction", "demand", "t"), efficiency=eff, on="input", technology=f"construction_{rc}_build", **common, ) elif typ == "scrap": data = make_io( (comm, "end_of_life", "t"), # will be flipped to output (f"{rc}_floor_demolition", "demand", "t"), efficiency=eff, on="input", technology=f"demolition_{rc}_build", **common, ) # Flip input to output (no input for demolition) data["output"] = pd.concat( [ data["output"], data.pop("input").rename( columns={"node_origin": "node_dest", "time_origin": "time_dest"} ), ] ) for name, df in data.items(): result[name].append(df) # Add floor construction/demolition demands from sturm_r, sturm_c if missing expr = "(comm|resid)_floor_(construc|demoli)tion" existing = ( pd.concat(result["demand"], ignore_index=True) if result["demand"] else pd.DataFrame() ) has_floor = ( len(existing) > 0 and existing["commodity"].str.fullmatch(expr, na=False).any() ) if not has_floor: floor_demand = pd.concat( [ df[df["commodity"].str.fullmatch(expr, na=False)] for df in [sturm_r, sturm_c] if df is not None and len(df) > 0 ], ignore_index=True, ) if len(floor_demand) > 0: result["demand"].append(floor_demand) # Use the reusable function to subtract material demand # One can change the method parameter to use different approaches: # - "bm_subtraction": Building material subtraction (default) # - "im_subtraction": Infrastructure material subtraction (to be implemented) # - "pm_subtraction": Power material subtraction (to be implemented) # - "tm_subtraction": Transport material subtraction (to be implemented) mat_demand = subtract_material_demand( scenario, info, sturm_r, sturm_c, method="bm_subtraction" ) # Add the modified demand to results result["demand"].append(mat_demand) # Concatenate data frames together return {k: pd.concat(v) for k, v in result.items()}
_BOUND_PARAMS = [ "bound_activity_lo", "bound_activity_up", "bound_new_capacity_lo", "bound_new_capacity_up", "bound_total_capacity_lo", "bound_total_capacity_up", ] _RC_PATTERNS = ["_rc", "_RC_RT", "_afofio"] def _remove_rc_bounds(scenario: message_ix.Scenario) -> None: """Remove bound parameters for technologies containing _rc, _RC_RT, or _afofio.""" with scenario.transact("Remove bounds for residual rc"): for par_name in _BOUND_PARAMS: par_df = scenario.par(par_name) if par_df.empty: continue tech_mask = par_df["technology"].str.contains( "|".join(_RC_PATTERNS), case=False, na=False ) techs_to_remove = par_df[tech_mask] if not techs_to_remove.empty: removed_techs = sorted(techs_to_remove["technology"].unique()) log.info( f"Removing {len(techs_to_remove)} rows from {par_name} " f"for technologies: {removed_techs}" ) scenario.remove_par(par_name, techs_to_remove) def _replace_ue_rt_share_with_share_mode(scenario: message_ix.Scenario) -> None: """Replace share_commodity_up UE_RT_elec_share_RC_max with share_mode_up.""" with scenario.transact("Remove old UE_RT_elec_share_RC_max constraint"): ue_rt_rows = scenario.par( "share_commodity_up", filters={"shares": "UE_RT_elec_share_RC_max"} ) if not ue_rt_rows.empty: scenario.remove_par("share_commodity_up", ue_rt_rows) log.info( f"Removed {len(ue_rt_rows)} rows from share_commodity_up " "for UE_RT_elec_share_RC_max" ) if "UE_RT_elec_share_RC_max" in scenario.set("shares").tolist(): scenario.remove_set("shares", "UE_RT_elec_share_RC_max") log.info("Removed UE_RT_elec_share_RC_max from shares set") all_regions = sorted([r for r in scenario.set("node") if "R12_" in r]) all_years = sorted([int(y) for y in scenario.set("year")]) technologies = [ "electr_comm_cool", "electr_resid_cool", "electr_resid_apps", "electr_resid_other_uses", "electr_comm_other_uses", "electr_resid_cook", # "electr_resid_heat", # "electr_comm_heat", # "electr_resid_hotwater", # "electr_comm_hotwater", ] # techs that do not need storage representation first (YJ/QW/SJ) df_share_mode_up = ( make_df( "share_mode_up", shares="RT_elec_share_RC_max", mode="M2", time="year", value=0.35, unit="-", ) .assign(node_share=None, technology=None, year_act=None) .pipe( broadcast, node_share=all_regions, technology=technologies, year_act=all_years, ) ) with scenario.transact("add the share mode constraint"): scenario.add_set("shares", "RT_elec_share_RC_max") scenario.add_par("share_mode_up", df_share_mode_up) log.info("Added share mode constraint RT_elec_share_RC_max")