"""Plots for MESSAGEix-Transport reporting."""
import logging
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Optional
import genno.compat.plotnine
import pandas as pd
import plotnine as p9
from genno import Computer
from iam_units import registry
from .key import gdp_cap, pdt_nyt
if TYPE_CHECKING:
import plotnine.typing
from genno.core.key import KeyLike
from .config import Config
log = logging.getLogger(__name__)
# Quiet messages like:
# "Fontsize 0.00 < 1.0 pt not allowed by FreeType. Setting fontsize= 1 pt"
# TODO Investigate or move upstream
logging.getLogger("matplotlib.font_manager").setLevel(logging.INFO + 1)
[docs]
class LabelFirst:
"""Labeller that labels the first item using a format string.
Subsequent items are named with the bare value only.
"""
__name__: Optional[str] = None
[docs]
def __init__(self, fmt_string):
self.fmt_string = fmt_string
self.first = True
def __call__(self, value):
first = self.first
self.first = False
return self.fmt_string.format(value) if first else value
[docs]
class Plot(genno.compat.plotnine.Plot):
"""Base class for plots.
This class extends :class:`genno.compat.plotnine.Plot` with extra features.
"""
#: 'Static' geoms: list of plotnine objects that are not dynamic
static: list["plotnine.typing.PlotAddable"] = [
p9.theme(figure_size=(11.7, 8.3)),
]
#: Fixed plot title string. If not given, the first line of the class docstring is
#: used.
title: Optional[str] = None
#: Units expression for plot title.
unit: Optional[str] = None
#: :obj:`False` for plots not intended to be run on a solved scenario.
runs_on_solved_scenario: bool = True
[docs]
def ggtitle(self, extra: Optional[str] = None):
"""Return :class:`plotnine.ggtitle` including the current date & time."""
title_parts = [
(self.title or self.__doc__ or "").splitlines()[0].rstrip("."),
f"[{self.unit}]" if self.unit else None,
f"— {extra}" if extra else None,
]
subtitle_parts = [
getattr(self.scenario, "url", "no Scenario"),
"—",
datetime.now().isoformat(timespec="minutes"),
]
return p9.labs(
title=" ".join(filter(None, title_parts)), subtitle=" ".join(subtitle_parts)
)
[docs]
def groupby_plot(self, data: pd.DataFrame, *args):
"""Combination of groupby and ggplot().
Groups by `args` and yields a series of :class:`plotnine.ggplot` objects, one
per group, with :attr:`static` geoms and :func:`ggtitle` appended to each.
"""
for group_key, group_df in data.groupby(*args):
yield (
group_key,
(
p9.ggplot(group_df)
+ self.static
+ self.ggtitle(f"{'-'.join(args)}={group_key!r}")
),
)
[docs]
def save(self, config, *args, **kwargs) -> Optional[Path]:
# Strip off the last of `args`, a pre-computed path, and store
*_args, self.path, self.scenario = args
# Call the parent method with the remaining arguments
return super().save(config, *_args, **kwargs)
[docs]
@classmethod
def add_tasks(
cls, c: Computer, key: "KeyLike", *inputs, strict: bool = False
) -> "KeyLike":
"""Use a custom output path."""
from operator import itemgetter
from genno import quote
# Output path for this parameter
k_path = f"plot {cls.basename} path"
filename = f"{cls.basename}{cls.suffix}"
if cls.runs_on_solved_scenario:
# Make a path including the Scenario URL
c.add(k_path, "make_output_path", "config", name=filename)
else:
# Build phase: no Scenario/URL exists; use a path set by add_debug()
c.add(f"{k_path} 0", itemgetter("transport build debug dir"), "config")
c.add(k_path, Path.joinpath, f"{k_path} 0", quote(filename))
# Same as the parent method
_inputs = list(inputs if inputs else cls.inputs)
# Append the key for `path` to the inputs
return super(Plot, cls).add_tasks(
c, key, *_inputs, k_path, "scenario", strict=strict
)
[docs]
class BaseEnergy0(Plot):
"""Transport final energy intensity of GDP."""
basename = "base-fe-intensity-gdp"
inputs = ["fe intensity:nl-ya:units"]
static = Plot.static + [
p9.aes(x="ya", y="value", color="nl"),
p9.geom_line(),
p9.geom_point(),
p9.labs(x="Period", y="", color="Node"),
p9.expand_limits(y=[0, 0.1]),
]
[docs]
def generate(self, data):
self.unit = data["unit"].unique()[0]
return p9.ggplot(data) + self.static + self.ggtitle()
[docs]
class CapNewLDV(Plot):
# FIXME remove hard-coded units
"""New LDV capacity [10⁶ vehicle]."""
basename = "cap-new-t-ldv"
inputs = ["historical_new_capacity:nl-t-yv:ldv", "CAP_NEW:nl-t-yv:ldv"]
static = Plot.static + [
p9.aes(x="yv", y="value", color="t"),
p9.geom_vline(xintercept=2020, size=4, color="white"),
p9.geom_line(),
p9.geom_point(),
p9.labs(x="Period", y="", color="LDV technology"),
]
[docs]
def generate(self, data0, data1):
# - Concatenate data0 (values in "historical_new_capacity" column) and
# data1 (values in "CAP_NEW" column).
# - Fill with zeros.
# - Compute a "value" column: one or the other.
# - Remove some errant values for R12_GLB.
# FIXME Investigate and remove the source
data = (
pd.concat([data0, data1])
.fillna(0)
.eval("value = CAP_NEW + historical_new_capacity")
.query("nl != 'R12_GLB'")
)
yield from [ggplot for _, ggplot in self.groupby_plot(data, "nl")]
[docs]
def read_csvs(stem: str, *paths: Path, **kwargs) -> pd.DataFrame:
"""Read and concatenate data for debugging plots.
- Read data from files named :file:`{stem}.csv` in each of `paths`.
- Store with shortened scenario labels extracted from the `paths`.
- Concatenate to a single data frame with a "scenario" column.
"""
def label_from(dirname: Path) -> str:
"""Extract e.g. "SSP(2024).2-R12-B" from "debug-ICONICS_SSP(2024).2-R12-B"."""
return dirname.parts[-1].split("ICONICS_", maxsplit=1)[-1]
kwargs.setdefault("comment", "#")
return pd.concat(
{
label_from(p): pd.read_csv(p.joinpath(f"{stem}.csv"), **kwargs)
for p in paths
},
names=["scenario"],
).reset_index("scenario")
[docs]
class ComparePDT(Plot):
"""Passenger activity.
This plot is used in :func:`.transport.build.debug_multi`, not in ordinary
reporting. Rather than receiving data from computed quantities already in the graph,
it reads them from files named :file:`pdt.csv` (per :attr:`.kind`) in the
directories generated by the workflow steps like "SSP1 debug build" (
:func:`.transport.build.debug`).
- One page per |n|.
- 5 horizontal panels for |t| (=transport modes).
- One line with points per scenario, coloured by scenario.
"""
runs_on_solved_scenario = False
basename = "compare-pdt"
static = Plot.static + [
p9.aes(x="y", y="value", color="scenario"),
p9.facet_wrap("t", ncol=5),
p9.geom_line(),
p9.geom_point(size=0.5),
p9.scale_y_log10(),
p9.labs(y="Activity"),
]
#: Base name for source data files, for instance :file:`pdt.csv`.
kind = "pdt"
#: Units of input files
unit = "km/a"
#: Unit adjustment factor.
factor = 1e6
[docs]
def generate(self, *paths: Path):
data = read_csvs(self.kind, *paths).eval("value = value / @self.factor")
# Add factor to the unit expression
if self.factor != 1.0:
self.unit = f"{self.factor:.0e} {self.unit}"
for _, ggplot in self.groupby_plot(data, "n"):
yield ggplot + p9.expand_limits(y=[5e-2, max(data["value"])])
[docs]
class ComparePDTCap0(ComparePDT):
"""Passenger activity per capita.
Identical to :class:`.ComparePDT`, but reads from :file:`pdt-cap.csv` instead.
"""
basename = "compare-pdt-cap"
kind = "pdt-cap"
factor = 1e3
#: Common layers for :class:`ComparePDTCap1` and :class:`DemandExoCap1`.
PDT_CAP_GDP_STATIC = Plot.static + [
p9.aes(x="gdp", y="value", color="t"),
p9.geom_line(),
p9.geom_point(),
p9.scale_x_log10(),
p9.scale_y_log10(),
p9.labs(x="GDP [10³ USD_2017 / capita]", y="", color="Transport mode"),
]
[docs]
class ComparePDTCap1(Plot):
"""Passenger activity.
Similar to :class:`DemandExoCap1`, except comparing multiple scenarios.
- One page per |n|.
- 5 horizontal panels for scenarios.
- One line with points per |t| (=transport mode), coloured by mode.
"""
runs_on_solved_scenario = False
basename = "compare-pdt-capita-gdp"
static = PDT_CAP_GDP_STATIC + [
p9.facet_wrap("scenario", ncol=5),
]
unit = "km/a"
[docs]
def generate(self, *paths: Path):
# Read data
df_pdt = read_csvs("pdt-cap", *paths)
df_gdp = read_csvs("gdp-ppp-cap", *paths)
# Merge data from two quantities; keep separate column names
# NB Same as DemandExoCap1, except on=["scenario", …]
data = df_pdt.merge(
df_gdp.rename(columns={"value": "gdp", "unit": "gdp_unit"}),
on=["scenario", "n", "y"],
)
# Set limits for log-log plot
stats = data.describe()
# NB Do not set common x limits across pages/nodes; only within.
limits = p9.expand_limits(y=[3e1, stats.loc["max", "value"]])
yield from [ggplot + limits for _, ggplot in self.groupby_plot(data, "n")]
[docs]
class InvCost0(Plot):
"""All transport investment cost."""
basename = "inv-cost-transport"
inputs = ["inv_cost:nl-t-yv:transport all"]
static = Plot.static + [
p9.aes(x="yv", y="inv_cost", color="t"),
p9.geom_line(),
p9.geom_point(),
]
[docs]
def generate(self, data):
y_max = max(data["inv_cost"])
self.unit = data["unit"].unique()[0]
for _, ggplot in self.groupby_plot(data, "nl"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
class InvCost1(InvCost0):
"""LDV transport investment cost.
Same as InvCost0, but for LDV techs only.
"""
basename = "inv-cost-ldv"
inputs = ["inv_cost:nl-t-yv:ldv"]
[docs]
class InvCost2(InvCost0):
"""Non-LDV transport investment cost.
Same as InvCost0, but for non-LDV techs only.
"""
basename = "inv-cost-nonldv"
inputs = ["inv_cost:nl-t-yv:non-ldv"]
[docs]
class FixCost(Plot):
"""Fixed cost."""
basename = "fix-cost"
inputs = ["fix_cost:nl-t-yv-ya:transport all"]
static = Plot.static + [
p9.aes(x="ya", y="fix_cost", color="t", group="t * yv"),
p9.geom_line(),
p9.geom_point(),
]
[docs]
def generate(self, data):
y_max = max(data["fix_cost"])
self.unit = data["unit"].unique()[0]
for _, ggplot in self.groupby_plot(data, "nl"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
class VarCost(Plot):
"""Variable cost."""
basename = "var-cost"
inputs = ["var_cost:nl-t-yv-ya:transport all"]
static = Plot.static + [
p9.aes(x="ya", y="var_cost", color="t", group="t * yv"),
p9.geom_line(),
p9.geom_point(),
]
[docs]
def generate(self, data):
y_max = max(data["var_cost"])
self.unit = data["unit"].unique()[0]
for nl, ggplot in self.groupby_plot(data, "nl"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
class LDV_IO(Plot):
"""Input efficiency [GWa / km]."""
basename = "ldv-efficiency"
inputs = ["input:nl-t-yv-ya:transport all"]
static = Plot.static + [
p9.aes(x="ya", y="input", color="t"),
# TODO remove typing exclusion once plotnine >0.12.4 is released
p9.facet_wrap(
["nl"],
ncol=2,
labeller=LabelFirst("node: {}"), # type: ignore [arg-type]
),
p9.geom_line(),
p9.geom_point(),
p9.labs(x="Period", y="", color="LDV technology"),
]
[docs]
def generate(self, data):
return p9.ggplot(data) + self.static + self.ggtitle()
[docs]
class OutShareLDV0(Plot):
"""Share of total LDV output [Ø]."""
basename = "out-share-t-ldv"
inputs = ["out:nl-t-ya:ldv+units"]
static = Plot.static + [
p9.aes(x="ya", y="value", fill="t"),
p9.geom_bar(stat="identity", width=4),
# # Select a palette with up to 12 colors
# p9.scale_fill_brewer(type="qual", palette="Set3"),
p9.labs(x="Period", y="", fill="LDV technology"),
]
[docs]
def generate(self, data):
# Normalize data
# TODO Do this in genno
data["value"] = data["value"] / data.groupby(["nl", "ya"])["value"].transform(
"sum"
)
yield from [ggplot for _, ggplot in self.groupby_plot(data, "nl")]
[docs]
class OutShareLDV1(Plot):
"""Share of LDV usage [Ø]."""
basename = "out-share-t-cg-ldv"
inputs = ["out:nl-t-ya-c", "cg"]
static = Plot.static + [
p9.aes(x="ya", y="value", fill="t"),
p9.facet_wrap(["c"], ncol=5),
p9.geom_bar(stat="identity", width=4),
p9.labs(x="Period", y="", fill="LDV technology"),
]
[docs]
def generate(self, data, cg):
# TODO do these operations in reporting for broader reuse
# - Recover the consumer group code from the commodity code.
# - Select only the consumer groups.
# - Recover the LDV technology code from the usage technology code.
data = (
data.assign(c=lambda df: df.c.str.replace("transport pax ", ""))
.query("c in @cg")
.assign(t=lambda df: df.t.str.split(" usage by ", expand=True)[0])
)
# Normalize data
data["value"] = data["value"] / data.groupby(["c", "nl", "ya"])[
"value"
].transform("sum")
yield from [ggplot for _, ggplot in self.groupby_plot(data, "nl")]
[docs]
def c_group(df: pd.DataFrame, cg):
return df.assign(
c_group=df.c.apply(
lambda v: "transport pax LDV" if any(cg_.id in v for cg_ in cg) else v
)
)
[docs]
class Demand0(Plot):
"""Passenger transport demand [pass · km / a]."""
basename = "demand"
inputs = ["demand:n-c-y", "c::transport", "cg"]
static = Plot.static + [
p9.aes(x="y", y="demand", fill="c_group"),
p9.geom_bar(stat="identity", width=4),
p9.labs(x="Period", y="", fill="Transport mode"),
]
@staticmethod
def _prep_data(data, commodities, cg):
# Convert and select data
_commodity = list(map(str, commodities))
return (
data.query("c in @_commodity")
.pipe(c_group, cg)
.groupby(["c_group", "n", "y"])
.aggregate({"demand": "sum"})
.reset_index()
)
[docs]
def generate(self, data, commodities, cg):
data = self._prep_data(data, commodities, cg)
yield from [ggplot for _, ggplot in self.groupby_plot(data, "n")]
[docs]
class Demand1(Demand0):
"""Share of transport demand [Ø]."""
basename = "demand-share"
[docs]
def generate(self, data, commodities, cg):
data = self._prep_data(data, commodities, cg)
# Normalize
data["demand"] = data["demand"] / data.groupby(["n", "y"])["demand"].transform(
"sum"
)
yield from [ggplot for _, ggplot in self.groupby_plot(data, "n")]
[docs]
class DemandCap(Plot):
"""Transport demand per capita [km / a]."""
basename = "demand-capita"
inputs = ["demand:n-c-y:capita", "c::transport", "cg"]
static = Plot.static + [
p9.aes(x="y", y="value", fill="c"),
p9.geom_bar(stat="identity", width=4),
p9.labs(x="Period", y="", fill="Transport mode group"),
]
[docs]
def generate(self, data, commodities, cg):
# Convert and select data
data = data.query(f"c in {repr(list(map(str, commodities)))}").pipe(c_group, cg)
for _, ggplot in self.groupby_plot(data, "n"):
yield ggplot
def _reduce_units(df: pd.DataFrame, target_units) -> tuple[pd.DataFrame, str]:
df_units = df["unit"].unique()
assert 1 == len(df_units)
tmp = registry.Quantity(1.0, df_units[0]).to(target_units)
return (
df.eval("value = value * @tmp.magnitude").assign(unit=f"{tmp.units:~}"),
f"{tmp.units:~}",
)
[docs]
class DemandExo(Plot):
"""Passenger transport activity."""
runs_on_solved_scenario = False
basename = "demand-exo"
inputs = [pdt_nyt]
static = Plot.static + [
p9.aes(x="y", y="value", fill="t"),
p9.geom_bar(stat="identity", width=4),
p9.labs(x="Period", y="", fill="Mode (tech group)"),
]
[docs]
def generate(self, data):
# FIXME shouldn't need to change dtype here
data = data.astype(dict(value=float))
data, self.unit = _reduce_units(data, "Gp km / a")
y_max = max(data["value"])
for _, ggplot in self.groupby_plot(data, "n"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
class DemandExoCap0(Plot):
"""Passenger transport activity per person."""
runs_on_solved_scenario = False
basename = "demand-exo-capita"
inputs = [pdt_nyt + "capita+post"]
static = Plot.static + [
p9.aes(x="y", y="value", fill="t"),
p9.geom_bar(stat="identity", width=4),
p9.labs(x="Period", y="", fill="Transport mode"),
]
[docs]
def generate(self, data):
# FIXME shouldn't need to change dtype here
data = data.astype(dict(value=float))
data, self.unit = _reduce_units(data, "Mm / a")
y_max = max(data["value"])
for _, ggplot in self.groupby_plot(data, "n"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
class DemandExoCap1(DemandExoCap0):
"""Transport demand per capita.
Unlike :class:`DemandExoCap0`, this uses GDP per capita as the abscissa/x-aesthetic.
"""
runs_on_solved_scenario = False
basename = "demand-exo-capita-gdp"
inputs = [pdt_nyt + "capita+post", gdp_cap]
static = PDT_CAP_GDP_STATIC
[docs]
def generate(self, df_pdt, df_gdp):
# Merge data from two quantities; keep separate column names
data = df_pdt.merge(
df_gdp.rename(columns={"value": "gdp", "unit": "gdp_unit"}), on=["n", "y"]
)
data, self.unit = _reduce_units(data, "km / a")
# Set limits for log-log plot
stats = data.describe()
limits = p9.expand_limits(
x=stats.loc[["min", "max"], "gdp"], y=[3e1, stats.loc["max", "value"]]
)
yield from [ggplot + limits for _, ggplot in self.groupby_plot(data, "n")]
[docs]
class EnergyCmdty0(Plot):
"""Energy input to transport [GWa]."""
basename = "energy-c"
inputs = ["y0", "in:nl-ya-c:transport all"]
static = Plot.static + [
p9.aes(x="ya", y="value", fill="c"),
p9.geom_bar(stat="identity", width=5, color="black"),
p9.labs(x="Period", y="Energy", fill="Commodity"),
]
[docs]
def generate(self, y0: int, data):
# Discard data for certain commodities
data = data[
~(
data.c.str.startswith("transport")
| (data.c == "disutility")
| (data.ya < y0)
)
]
for _, ggplot in self.groupby_plot(data, "nl"):
yield ggplot
[docs]
class EnergyCmdty1(EnergyCmdty0):
"""Share of energy input to transport [0]."""
basename = "energy-c-share"
[docs]
def generate(self, y0: int, data):
# Discard data for certain commodities
data = data[
~(
data.c.str.startswith("transport")
| (data.c == "disutility")
| (data.ya < y0)
)
]
# Normalize data
# TODO Do this in genno
data["value"] = data["value"] / data.groupby(["nl", "ya"])["value"].transform(
"sum"
)
for _, ggplot in self.groupby_plot(data, "nl"):
yield ggplot
[docs]
class Stock0(Plot):
"""LDV transport vehicle stock."""
basename = "stock-ldv"
# Partial sum over driver_type dimension
inputs = ["CAP:nl-t-ya:ldv+units"]
static = Plot.static + [
p9.aes(x="ya", y="CAP", color="t"),
p9.geom_line(),
p9.geom_point(),
p9.labs(x="Period", y="", color="Powertrain technology"),
]
[docs]
def generate(self, data):
y_max = max(data["CAP"])
self.unit = data["unit"].unique()[0]
for _, ggplot in self.groupby_plot(data, "nl"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
class Stock1(Plot):
"""Non-LDV transport vehicle stock.
Same as Stock0, but for non-LDV techs only.
"""
basename = "stock-non-ldv"
inputs = ["CAP:nl-t-ya:non-ldv+units"]
static = Plot.static + [
p9.aes(x="ya", y="CAP", color="t"),
p9.geom_line(),
p9.geom_point(),
p9.labs(x="Period", y="", color="Powertrain technology"),
]
[docs]
def generate(self, data):
if not len(data):
return
y_max = max(data["CAP"])
self.unit = data["unit"].unique()[0]
for _, ggplot in self.groupby_plot(data, "nl"):
yield ggplot + p9.expand_limits(y=[0, y_max])
[docs]
def prepare_computer(c: Computer):
"""Add :data:`.PLOTS` to `c`.
Adds:
- 1 key like **"plot inv-cost"** corresponding to the :attr:`~.Plot.basename` of
each :class:`.Plot` subclass defined in this module.
- The key **"transport plots"** that triggers writing all the plots to file.
"""
import matplotlib
# Force matplotlib to use a non-interactive backend for plotting
matplotlib.use("pdf")
keys = []
config: "Config" = c.graph["config"]["transport"]
# Iterate over the Plot subclasses defined in the current module
for plot in filter(
lambda cls: isinstance(cls, type) and issubclass(cls, Plot) and cls is not Plot,
globals().values(),
):
if (not plot.runs_on_solved_scenario and config.with_solution) or (
False # Use True here or uncomment below to skip some or all plots
# "stock" not in plot.basename
):
log.info(f"Skip {plot}")
continue
keys.append(f"plot {plot.basename}")
c.add(keys[-1], plot)
key = "transport plots"
log.info(f"Add {repr(key)} collecting {len(keys)} plots")
c.add(key, keys)