Source code for message_ix_models.model.transport.groups

"""Consumer groups data."""

import logging
from copy import deepcopy
from typing import TYPE_CHECKING, Dict

import pandas as pd
import xarray as xr
from genno import Quantity

if TYPE_CHECKING:
    from genno import Computer

log = logging.getLogger(__name__)


[docs]def prepare_computer(c: "Computer") -> None: """Prepare `rep` for calculating transport consumer groups.""" from .key import cg, pop_at c.add("indexers:n-cd", "indexers_n_cd", "config") # Population shares by area_type c.add(pop_at, urban_rural_shares, "population:n-y", "config") # Exogenous data for consumer group sizes keys = c.infer_keys( [ "population suburb share:*:exo", "ma3t attitude:*:exo", "ma3t driver:*:exo", "ma3t population:*:exo", ] ) c.add(cg, cg_shares, pop_at, *keys, "indexers:n-cd", "indexers:cg")
[docs]def cg_shares( ursu_ru: Quantity, su_share: Quantity, ma3t_attitude: Quantity, ma3t_driver: Quantity, ma3t_pop: Quantity, n_cd_indexers: Dict[str, xr.DataArray], cg_indexers: Dict[str, xr.DataArray], ) -> Quantity: """Return shares of transport consumer groups. Parameters ---------- ursu_ru : Quantity Population shares with "UR+SU" and "RU" on the ``area_type`` dimension. su_share : Quantity Share of suburban ("SU") population within "UR+SU". ma3t_attitude: Quantity ma3t_driver : Quantity ma3t_pop : Quantity Population shares between urban, suburban, and rural. DLM: “Values from MA3T are based on 2001 NHTS survey and some more recent calculations done in 2008 timeframe. Therefore, I assume that the numbers here are applicable to the US in 2005.” NB in the spreadsheet, the data are also filled forward to 2110. Currently not used. n_cd_indexers : dict cg_indexers : dict Returns ------- .Quantity Dimensions: n, y, cg. Units.dimensionless. """ from genno.operator import concat, mul cg_indexers = deepcopy(cg_indexers) consumer_group = cg_indexers.pop("consumer_group") # Assumption: split of population between area_type 'UR' and 'SU' # - Fill forward along years, for nodes where only a year 2010 value is assumed. # - Fill backward 2010 to 2005, in order to compute. su_share = su_share.ffill("y").bfill("y") # Split the 'UR+SU' population share to "UR", "SU" using `su_share` assert {"UR+SU", "RU"} == set(ursu_ru.coords["area_type"].data) pop_share = ( concat( ursu_ru.sel(area_type="UR+SU", drop=True) * (1 - su_share), ursu_ru.sel(area_type="UR+SU", drop=True) * su_share, ursu_ru.sel(area_type="RU", drop=True), dim=pd.Index(["UR", "SU", "RU"], name="area_type"), ) .ffill("y") .bfill("y") ) # - Compute the group shares. # - Select using matched sequences, i.e. select a sequence of (node, # census_division) coordinates. # - Drop the census_division. # - Collapse area_type, attitude, driver_type dimensions into consumer_group. # - Convert to short dimension names. groups = ( mul(pop_share, ma3t_attitude, ma3t_driver) .sel(n_cd_indexers) .sel(cg_indexers) .assign_coords(consumer_group=consumer_group.values) .rename(dict(node="n", year="y", consumer_group="cg")) ) # Assert that the sum across groups is 1 assert isinstance(groups, Quantity) assert (groups.sum("cg") - 1 < 1e-5).all(), "Groups not balanced" return groups
[docs]def urban_rural_shares(pop: Quantity, config: dict) -> Quantity: """Return shares of urban and rural population. Parameters ---------- pop : .Quantity Must have n and y dimensions. config : dict The ``regions`` and ``data source/population`` keys are used. Returns ------- .Quantity Dimensions: at least area_type, possibly also n, y. Units: dimensionless. """ from genno.operator import div from message_ix_models.util import broadcast if "area_type" in pop.dims: result = div( pop.sel(area_type=["UR+SU", "RU"]), pop.sel(area_type="total", drop=True) ) else: log.warning("Population data lack 'area_type' dimension") df = ( pd.DataFrame([["UR+SU", 0.6], ["RU", 0.4]], columns=["area_type", "value"]) .assign(n=None, y=None) .pipe(broadcast, n=pop.coords["n"].data, y=pop.coords["y"].data) .set_index(["area_type", "n", "y"]) ) result = Quantity(df) assert {"area_type", "n", "y"} == set(result.dims), result.dims return result