Source code for message_ix_models.model.transport.plot

"""Plots for MESSAGEix-Transport reporting."""

from pathlib import Path
from typing import cast

import pandas as pd
import plotnine as p9
from iam_units import registry

from message_ix_models.model.workflow import STAGE
from message_ix_models.report.plot import COMMON, LabelFirst, Plot, PlotFromIAMC

from . import key

#: Common, static settings
STATIC = [
    COMMON["A4 landscape"],
]


[docs] class BaseEnergy0(Plot): """Transport final energy intensity of GDP.""" basename = "base-fe-intensity-gdp" inputs = ["fe intensity:nl-ya:units"] static = STATIC + [ p9.aes(x="ya", y="value", color="nl"), p9.geom_line(), p9.geom_point(), p9.labs(x="Period", y="", color="Node"), p9.expand_limits(y=[0, 0.1]), ]
[docs] def generate(self, data): self.unit = data["unit"].unique()[0] return p9.ggplot(data) + self.static + self.ggtitle()
[docs] class CapNewLDV(Plot): # FIXME remove hard-coded units """New LDV capacity [10⁶ vehicle].""" basename = "cap-new-t-ldv" inputs = ["historical_new_capacity:nl-t-yv:ldv", "CAP_NEW:nl-t-yv:ldv"] static = STATIC + [ p9.aes(x="yv", y="value", color="t"), p9.geom_vline(xintercept=2020, size=4, color="white"), p9.geom_line(), p9.geom_point(), p9.labs(x="Period", y="", color="LDV technology"), ]
[docs] def generate(self, data0, data1): # - Concatenate data0 (values in "historical_new_capacity" column) and # data1 (values in "CAP_NEW" column). # - Fill with zeros. # - Compute a "value" column: one or the other. # - Remove some errant values for R12_GLB. # FIXME Investigate and remove the source data = ( pd.concat([data0, data1]) .fillna(0) .eval("value = CAP_NEW + historical_new_capacity") .query("nl != 'R12_GLB'") ) yield from [ggplot for _, ggplot in self.groupby_plot(data, "nl")]
[docs] def read_csvs(stem: str, *paths: Path, **kwargs) -> pd.DataFrame: """Read and concatenate data for debugging plots. - Read data from files named :file:`{stem}.csv` in each of `paths`. - Store with shortened scenario labels extracted from the `paths`. - Concatenate to a single data frame with a "scenario" column. """ def label_from(dirname: Path) -> str: """Extract e.g. "SSP(2024).2-R12-B" from "debug-ICONICS_SSP(2024).2-R12-B".""" return dirname.parts[-1].split("ICONICS_", maxsplit=1)[-1] kwargs.setdefault("comment", "#") return pd.concat( { label_from(p): pd.read_csv(p.joinpath(f"{stem}.csv"), **kwargs) for p in paths }, names=["scenario"], ).reset_index("scenario")
[docs] class ComparePDT(Plot): """Passenger activity. This plot is used in :func:`.transport.build.debug_multi`, not in ordinary reporting. Rather than receiving data from computed quantities already in the graph, it reads them from files named :file:`pdt.csv` (per :attr:`.measure`) in the directories generated by the workflow steps like "SSP1 debug build" ( :func:`.transport.build.debug`). - One page per |n|. - 5 horizontal panels for |t| (=transport modes). - One line with points per scenario, coloured by scenario. """ basename = "compare-pdt" stage = STAGE.BUILD single = False static = STATIC + [ p9.aes(x="y", y="value", color="scenario"), p9.facet_wrap("t", ncol=5), p9.geom_line(), p9.geom_point(size=0.5), p9.scale_y_log10(), p9.labs(y="Activity"), ] #: Base name for source data files, for instance :file:`pdt.csv`. measure = "pdt" #: Units of input files unit = "km/a" #: Unit adjustment factor. factor = 1e6
[docs] def generate(self, *paths: Path): data = cast( pd.DataFrame, read_csvs(self.measure, *paths).eval("value = value / @self.factor"), ) # Add factor to the unit expression if self.factor != 1.0: self.unit = f"{self.factor:.0e} {self.unit}" for _, ggplot in self.groupby_plot(data, "n"): yield ggplot + p9.expand_limits(y=[5e-2, max(data["value"])])
[docs] class ComparePDTCap0(ComparePDT): """Passenger activity per capita. Identical to :class:`.ComparePDT`, but reads from :file:`pdt-cap.csv` instead. """ basename = "compare-pdt-cap" measure = "pdt-cap" factor = 1e3
#: Common layers for :class:`ComparePDTCap1` and :class:`DemandExoCap1`. PDT_CAP_GDP_STATIC = STATIC + [ p9.aes(x="gdp", y="value", color="t"), p9.geom_line(), p9.geom_point(), p9.scale_x_log10(), p9.scale_y_log10(), p9.labs(x="GDP [10³ USD_2017 / capita]", y="", color="Transport mode"), ]
[docs] class ComparePDTCap1(Plot): """Passenger activity. Similar to :class:`DemandExoCap1`, except comparing multiple scenarios. - One page per |n|. - 5 horizontal panels for scenarios. - One line with points per |t| (=transport mode), coloured by mode. """ basename = "compare-pdt-capita-gdp" stage = STAGE.BUILD single = False static = PDT_CAP_GDP_STATIC + [ p9.facet_wrap("scenario", ncol=5), ] unit = "km/a"
[docs] def generate(self, *paths: Path): # Read data df_pdt = read_csvs("pdt-cap", *paths) df_gdp = read_csvs("gdp-ppp-cap", *paths) # Merge data from two quantities; keep separate column names # NB Same as DemandExoCap1, except on=["scenario", …] data = df_pdt.merge( df_gdp.rename(columns={"value": "gdp", "unit": "gdp_unit"}), on=["scenario", "n", "y"], ) # Set limits for log-log plot stats = data.describe() # NB Do not set common x limits across pages/nodes; only within. limits = p9.expand_limits(y=[3e1, stats.loc["max", "value"]]) yield from [ggplot + limits for _, ggplot in self.groupby_plot(data, "n")]
[docs] class InvCost0(Plot): """All transport investment cost.""" basename = "inv-cost-transport" inputs = ["inv_cost:nl-t-yv:transport all"] static = STATIC + [ p9.aes(x="yv", y="inv_cost", color="t"), p9.geom_line(), p9.geom_point(), ]
[docs] def generate(self, data): y_max = max(data["inv_cost"]) self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "nl"): yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class InvCost1(InvCost0): """LDV transport investment cost. Same as InvCost0, but for LDV techs only. """ basename = "inv-cost-ldv" inputs = ["inv_cost:nl-t-yv:ldv"]
[docs] class InvCost2(InvCost0): """Non-LDV transport investment cost. Same as InvCost0, but for non-LDV techs only. """ basename = "inv-cost-nonldv" inputs = ["inv_cost:nl-t-yv:non-ldv"]
[docs] class FixCost(Plot): """Fixed cost.""" basename = "fix-cost" inputs = ["fix_cost:nl-t-yv-ya:transport all"] static = STATIC + [ p9.aes(x="ya", y="fix_cost", color="t", group="t * yv"), p9.geom_line(), p9.geom_point(), ]
[docs] def generate(self, data): y_max = max(data["fix_cost"]) self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "nl"): yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class VarCost(Plot): """Variable cost.""" basename = "var-cost" inputs = ["var_cost:nl-t-yv-ya:transport all"] static = STATIC + [ p9.aes(x="ya", y="var_cost", color="t", group="t * yv"), p9.geom_line(), p9.geom_point(), ]
[docs] def generate(self, data): y_max = max(data["var_cost"]) self.unit = data["unit"].unique()[0] for nl, ggplot in self.groupby_plot(data, "nl"): yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class LDV_IO(Plot): """Input efficiency [GWa / km].""" basename = "ldv-efficiency" inputs = ["input:nl-t-yv-ya:transport all"] static = STATIC + [ p9.aes(x="ya", y="input", color="t"), # TODO remove typing exclusion once plotnine >0.12.4 is released p9.facet_wrap( ["nl"], ncol=2, labeller=LabelFirst("node: {}"), # type: ignore [arg-type] ), p9.geom_line(), p9.geom_point(), p9.labs(x="Period", y="", color="LDV technology"), ]
[docs] def generate(self, data): return p9.ggplot(data) + self.static + self.ggtitle()
[docs] class OutShareLDV0(Plot): """Share of total LDV output [Ø].""" basename = "out-share-t-ldv" inputs = ["out:nl-t-ya:ldv+units"] static = STATIC + [ p9.aes(x="ya", y="value", fill="t"), p9.geom_bar(stat="identity", width=4), # # Select a palette with up to 12 colors # p9.scale_fill_brewer(type="qual", palette="Set3"), p9.labs(x="Period", y="", fill="LDV technology"), ]
[docs] def generate(self, data): # Normalize data # TODO Do this in genno data["value"] = data["value"] / data.groupby(["nl", "ya"])["value"].transform( "sum" ) yield from [ggplot for _, ggplot in self.groupby_plot(data, "nl")]
[docs] class OutShareLDV1(Plot): """Share of LDV usage [Ø].""" basename = "out-share-t-cg-ldv" inputs = ["out:nl-t-ya-c", "cg"] static = STATIC + [ p9.aes(x="ya", y="value", fill="t"), p9.facet_wrap(["c"], ncol=5), p9.geom_bar(stat="identity", width=4), p9.labs(x="Period", y="", fill="LDV technology"), ]
[docs] def generate(self, data, cg): # TODO do these operations in reporting for broader reuse # - Recover the consumer group code from the commodity code. # - Select only the consumer groups. # - Recover the LDV technology code from the usage technology code. data = ( data.assign(c=lambda df: df.c.str.replace("transport pax ", "")) .query("c in @cg") .assign(t=lambda df: df.t.str.split(" usage by ", expand=True)[0]) ) # Normalize data data["value"] = data["value"] / data.groupby(["c", "nl", "ya"])[ "value" ].transform("sum") yield from [ggplot for _, ggplot in self.groupby_plot(data, "nl")]
[docs] def c_group(df: pd.DataFrame, cg): return df.assign( c_group=df.c.apply( lambda v: "transport pax LDV" if any(cg_.id in v for cg_ in cg) else v ) )
[docs] class Demand0(Plot): """Passenger transport demand [pass · km / a].""" basename = "demand" inputs = ["demand:n-c-y", "c::transport", "cg"] static = STATIC + [ p9.aes(x="y", y="demand", fill="c_group"), p9.geom_bar(stat="identity", width=4), p9.labs(x="Period", y="", fill="Transport mode"), ] @staticmethod def _prep_data(data, commodities, cg): # Convert and select data _commodity = list(map(str, commodities)) return ( data.query("c in @_commodity") .pipe(c_group, cg) .groupby(["c_group", "n", "y"]) .aggregate({"demand": "sum"}) .reset_index() )
[docs] def generate(self, data, commodities, cg): data = self._prep_data(data, commodities, cg) yield from [ggplot for _, ggplot in self.groupby_plot(data, "n")]
[docs] class Demand1(Demand0): """Share of transport demand [Ø].""" basename = "demand-share"
[docs] def generate(self, data, commodities, cg): data = self._prep_data(data, commodities, cg) # Normalize data["demand"] = data["demand"] / data.groupby(["n", "y"])["demand"].transform( "sum" ) yield from [ggplot for _, ggplot in self.groupby_plot(data, "n")]
[docs] class DemandCap(Plot): """Transport demand per capita [km / a].""" basename = "demand-capita" inputs = ["demand:n-c-y:capita", "c::transport", "cg"] static = STATIC + [ p9.aes(x="y", y="value", fill="c"), p9.geom_bar(stat="identity", width=4), p9.labs(x="Period", y="", fill="Transport mode group"), ]
[docs] def generate(self, data, commodities, cg): # Convert and select data data = data.query(f"c in {list(map(str, commodities))!r}").pipe(c_group, cg) for _, ggplot in self.groupby_plot(data, "n"): yield ggplot
def _reduce_units(df: pd.DataFrame, target_units) -> tuple[pd.DataFrame, str]: df_units = df["unit"].unique() assert 1 == len(df_units) tmp = registry.Quantity(1.0, df_units[0]).to(target_units) return ( cast(pd.DataFrame, df.eval("value = value * @tmp.magnitude")).assign( unit=f"{tmp.units:~}" ), f"{tmp.units:~}", )
[docs] class DemandExo(Plot): """Passenger transport activity.""" basename = "demand-exo" stage = STAGE.BUILD inputs = [key.pdt_nyt] static = STATIC + [ p9.aes(x="y", y="value", fill="t"), p9.geom_bar(stat="identity", width=4), p9.labs(x="Period", y="", fill="Mode (tech group)"), ]
[docs] def generate(self, data): # FIXME shouldn't need to change dtype here data = data.astype(dict(value=float)) data, self.unit = _reduce_units(data, "Gp km / a") y_max = max(data["value"]) for _, ggplot in self.groupby_plot(data, "n"): yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class DemandExoCap0(Plot): """Passenger transport activity per person.""" basename = "demand-exo-capita" stage = STAGE.BUILD inputs = [key.pdt_nyt + "capita+post"] static = STATIC + [ p9.aes(x="y", y="value", fill="t"), p9.geom_bar(stat="identity", width=4), p9.labs(x="Period", y="", fill="Transport mode"), ]
[docs] def generate(self, data): # FIXME shouldn't need to change dtype here data = data.astype(dict(value=float)) data, self.unit = _reduce_units(data, "Mm / a") y_max = max(data["value"]) for _, ggplot in self.groupby_plot(data, "n"): yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class DemandExoCap1(DemandExoCap0): """Transport demand per capita. Unlike :class:`DemandExoCap0`, this uses GDP per capita as the abscissa/x-aesthetic. """ basename = "demand-exo-capita-gdp" stage = STAGE.BUILD inputs = [key.pdt_nyt + "capita+post", key.gdp_cap] static = PDT_CAP_GDP_STATIC
[docs] def generate(self, df_pdt, df_gdp): # Merge data from two quantities; keep separate column names data = df_pdt.merge( df_gdp.rename(columns={"value": "gdp", "unit": "gdp_unit"}), on=["n", "y"] ) data, self.unit = _reduce_units(data, "km / a") # Set limits for log-log plot stats = data.describe() limits = p9.expand_limits( x=stats.loc[["min", "max"], "gdp"], y=[3e1, stats.loc["max", "value"]] ) yield from [ggplot + limits for _, ggplot in self.groupby_plot(data, "n")]
[docs] class EnergyCmdty0(Plot): """Energy input to transport [GWa].""" basename = "energy-c" inputs = ["y0", "in:nl-ya-c:transport all"] static = STATIC + [ p9.aes(x="ya", y="value", fill="c"), p9.geom_bar(stat="identity", width=5, color="black"), p9.labs(x="Period", y="Energy", fill="Commodity"), ]
[docs] def generate(self, y0: int, data): # Discard data for certain commodities data = data[ ~( data.c.str.startswith("transport") | (data.c == "disutility") | (data.ya < y0) ) ] for _, ggplot in self.groupby_plot(data, "nl"): yield ggplot
[docs] class EnergyCmdty1(EnergyCmdty0): """Share of energy input to transport [0].""" basename = "energy-c-share"
[docs] def generate(self, y0: int, data): # Discard data for certain commodities data = data[ ~( data.c.str.startswith("transport") | (data.c == "disutility") | (data.ya < y0) ) ] # Normalize data # TODO Do this in genno data["value"] = data["value"] / data.groupby(["nl", "ya"])["value"].transform( "sum" ) for _, ggplot in self.groupby_plot(data, "nl"): yield ggplot
[docs] class MultiFE(PlotFromIAMC): """Final energy, passenger.""" basename = "multi-fe" single = False inputs = ["all:n-s-UNIT-v-y"] # NB This pattern excludes the total. A pattern which includes the total (as an # empty string in the 'v' dimension) is: # r"Final Energy\|Transportation\|P\|?(|Electricity|Gas|Hydrogen|Liquids.*)" iamc_variable_pattern = ( r"Final Energy\|Transportation\|P\|(Electricity|Gas|Hydrogen|Liquids.*)" ) static = STATIC + [ p9.aes(x="y", y="value", fill="v"), p9.facet_wrap("s", ncol=3, nrow=5), p9.geom_area(color="black", size=0.2), p9.scale_fill_brewer(type="qualitative", palette="Paired"), p9.labs(x="Period", y="", fill="Technology"), COMMON["A3 portrait"], ]
[docs] def generate(self, data): # pragma: no cover # Show only data in the model horizon # TODO Remove once historical data are included in the input data = data.query("y >= 2020") self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "n"): # Maximum y-limit for this node y_max = max(ggplot.data["value"]) yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class MultiStock(PlotFromIAMC): """LDV technology stock.""" basename = "multi-stock" single = False inputs = ["all:n-s-UNIT-v-y"] iamc_variable_pattern = r"Transport\|Stock\|Road\|Passenger\|LDV\|(.*)" static = STATIC + [ p9.aes(x="y", y="value", color="v"), p9.facet_wrap("s", ncol=3, nrow=5), p9.geom_point(), p9.geom_line(), p9.scale_color_brewer(type="qualitative", palette="Paired"), p9.labs(x="Period", y="", color="Technology"), COMMON["A3 portrait"], ]
[docs] def generate(self, data): # pragma: no cover # Show only data in the model horizon # TODO Remove once historical data are included in the input data = data.query("y >= 2020") self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "n"): # Maximum y-limit for this node y_max = max(ggplot.data["value"]) yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class Scale1Diff(Plot): """scale-1 factor in y=2020; changes between 2 scenarios.""" basename = "scale-1-diff" stage = STAGE.BUILD inputs = ["scale-1:nl-t-c-l-h:a", "scale-1:nl-t-c-l-h:b"] _s, _v, _y = "scenario", "value", "t + ' ' + c" static = STATIC + [ p9.aes(x=_v, y=_y, yend=_y, group=_s, color=_s, shape=_s), p9.facet_wrap("nl", scales="free_x"), p9.geom_vline(p9.aes(xintercept=_v), pd.DataFrame([[1.0]], columns=[_v])), p9.geom_point(), p9.scale_shape(unfilled=True), p9.scale_x_log10(), p9.labs(x="", y="Mode × commodity"), ]
[docs] def generate(self, data_a, data_b): # Data for plotting points df0 = ( pd.concat([data_a.assign(scenario="a"), data_b.assign(scenario="b")]) .drop(["l", "h", "unit"], axis=1) .query("nl != 'R12_GLB'") ) # Data for plotting segments/arrows df1 = ( df0.pivot(columns="scenario", index=["nl", "t", "c"]) .set_axis(["value", "xend"], axis=1) .reset_index() .assign(scenario="diff") ) return ( p9.ggplot(df0) + p9.geom_segment(p9.aes(xend="xend"), df1, arrow=p9.arrow(length=0.03)) + self.static + self.ggtitle() )
[docs] class Stock0(Plot): """LDV transport vehicle stock.""" basename = "stock-ldv" # Partial sum over driver_type dimension inputs = ["CAP:nl-t-ya:ldv+units"] static = STATIC + [ p9.aes(x="ya", y="CAP", color="t"), p9.geom_line(), p9.geom_point(), p9.labs(x="Period", y="", color="Powertrain technology"), ]
[docs] def generate(self, data): y_max = max(data["CAP"]) self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "nl"): yield ggplot + p9.expand_limits(y=[0, y_max])
[docs] class Stock1(Plot): """Non-LDV transport vehicle stock. Same as Stock0, but for non-LDV techs only. """ basename = "stock-non-ldv" inputs = ["CAP:nl-t-ya:non-ldv+units"] static = STATIC + [ p9.aes(x="ya", y="CAP", color="t"), p9.geom_line(), p9.geom_point(), p9.labs(x="Period", y="", color="Powertrain technology"), ]
[docs] def generate(self, data): if not len(data): return y_max = max(data["CAP"]) self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "nl"): yield ggplot + p9.expand_limits(y=[0, y_max])