Source code for message_ix_models.report.plot

"""Plots for MESSAGEix-GLOBIOM reporting.

The current set functions on time series data stored on the scenario by
:mod:`message_ix_models.report` or :mod:`message_data` legacy reporting.
"""

import logging
import re
from collections.abc import Iterator, Sequence
from datetime import datetime
from importlib import import_module
from types import ModuleType, SimpleNamespace
from typing import TYPE_CHECKING, Any, Literal

import genno.compat.plotnine
import pandas as pd
import plotnine as p9
from genno import Computer, Key, Keys

from message_ix_models.model.workflow import STAGE

if TYPE_CHECKING:
    from pathlib import Path
    from typing import Protocol

    from genno.core.key import KeyLike

    from message_ix_models import Context
    from message_ix_models.types import PlotAddable

    class HasURL(Protocol):
        url: str


__all__ = [
    "EmissionsCO2",
    "FinalEnergy0",
    "FinalEnergy1",
    "LabelFirst",
    "Plot",
    "PlotTimeSeries",
    "PlotFromIAMC",
    "PrimaryEnergy0",
    "PrimaryEnergy1",
    "callback",
    "collect",
    "prepare_computer",
]

log = logging.getLogger(__name__)

# Quiet messages like:
#   "Fontsize 0.00 < 1.0 pt not allowed by FreeType. Setting fontsize= 1 pt"
# TODO Investigate or move upstream
logging.getLogger("matplotlib.font_manager").setLevel(logging.INFO + 1)

#: Reusable components.
COMMON: dict[str, "PlotAddable"] = {
    "A2 landscape": p9.theme(figure_size=(23.4, 16.6)),
    "A3 portrait": p9.theme(figure_size=(11.7, 16.6)),
    "A4 landscape": p9.theme(figure_size=(11.7, 8.3)),
}


[docs] class LabelFirst: """:mod:`plotnine` labeller that labels the first item using a format string. Subsequent items are named with the bare value only. """ __name__: str | None = None def __init__(self, fmt_string): self.fmt_string = fmt_string self.first = True def __call__(self, value: Any) -> str: result = self.fmt_string.format(value) if self.first else str(value) self.first = False return result
[docs] class Plot(genno.compat.plotnine.Plot): """Base class for plots based on reported time-series data. Subclasses should be used like: .. code-block:: python class MyPlot(Plot): ... c.add("plot myplot", MyPlot, "scenario") …that is, giving "scenario" or another key that points to a :class:`.Scenario` object with stored time series data. See the examples in this file. The :attr:`single` and :attr:`stage` class attributes can be used to specify the context in which the plot is meant to be used: - :data:`.STAGE.BUILD`, :py:`single=True`: Plots to be used during the process of building a single scenario. These are used within a :class:`~.genno.Computer`, without a built or solved Scenario. See for example :func:`.transport.build.get_computer`. - :data:`.STAGE.BUILD`, :py:`single=False`: Plots for pre-solve or build-process analysis/debugging of multiple scenarios. These are added by, for instance, :func:`.transport.build.debug_multi`. - :data:`.STAGE.REPORT`, :py:`single=True`: Plots for post-solve reporting of single MESSAGEix-GLOBIOM scenarios. These are added to a :class:`.Reporter` via :func:`.report.prepare_reporter` or a module-specific callback, for instance :func:`.transport.report.callback`. - :data:`.STAGE.REPORT`, :py:`single=False`: Plots of post-solve reporting of multiple MESSAGEix-GLOBIOM scenarios, or of data reported from these. These are added by, for instance, :func:`.transport.report.multi`. """ # Narrow upstream type # TODO Move upstream to genno inputs: Sequence["KeyLike"] #: 'Static' geoms: list of plotnine objects that are not dynamic. static: list["PlotAddable"] = [COMMON["A2 landscape"]] #: Fixed plot title string. If not given, the first line of the class docstring is #: used. title: str | None = None #: Units expression for plot title. unit: str | None = None #: Workflow stage at which the Plot is to be used. stage: Literal[STAGE.BUILD, STAGE.REPORT] = STAGE.REPORT #: :any:`True` if the plot is to be used in a :class:`.Reporter` with keys/tasks for #: reporting a single scenario. Use :any:`False` for plots of data from multiple #: scenarios. single: bool = True # Object (Scenario, ScenarioInfo, etc.) with a `url` attribute, for ggtitle() _scenario: "HasURL"
[docs] @classmethod def add_tasks( cls, c: Computer, key: "KeyLike", *inputs, strict: bool = False ) -> "KeyLike": """Add tasks to `c` to generate and save the Plot. Beyond the base class method, this method: - Constructs an output path from :attr:`basename`, :attr:`suffix`, and the :func:`make_output_path` operator. This in turn uses the :py:`c.config` values "output_dir" (if :attr:`stage` is :any:`.STAGE.REPORT`) or "build debug dir". - If :py:`single is False`, adds a key "scenario" with a dummy URL that can be used by :meth:`.ggtitle`. - Appends 2 keys for the above to the `inputs` for use by :meth:`save`. """ # Output path for this parameter k_path = Key(key) + "path" # Construct the output path for this plot # If single=True, config["output_dir"] **may** include a subdirectory from the # scenario URL c.add( k_path, "make_output_path", "config", name=f"{cls.basename}{cls.suffix}", config_key="output_dir" if cls.stage is STAGE.REPORT else "build debug dir", ) if not cls.single and c.graph.get("scenario", None) is None: # Add a placeholder for ggtitle() formatting of scenario.url c.add("scenario", SimpleNamespace(url="Multiple scenarios")) # Prepare inputs # - Same as parent class: explicit args to add_tasks() or from class attribute # - 2 items expected by save(), below _inputs = list(inputs if inputs else cls.inputs) + [k_path, "scenario"] return super(Plot, cls).add_tasks(c, key, *_inputs, strict=strict)
[docs] def ggtitle(self, extra: str | None = None) -> "PlotAddable": """Return :class:`plotnine.ggtitle` including the current date & time.""" title_parts = [ (self.title or self.__doc__ or "").splitlines()[0].rstrip("."), f"[{self.unit}]" if self.unit else None, f"— {extra}" if extra else None, ] subtitle_parts = [ self._scenario.url, "—", datetime.now().isoformat(timespec="minutes"), ] return p9.labs( title=" ".join(filter(None, title_parts)), subtitle=" ".join(subtitle_parts) )
[docs] def groupby_plot(self, data: pd.DataFrame, *args): """Combination of groupby and ggplot(). Groups by `args` and yields a series of :class:`plotnine.ggplot` objects, one per group, with :attr:`static` geoms and :func:`ggtitle` appended to each. """ for group_key, group_df in data.groupby(*args): yield ( group_key, ( p9.ggplot(group_df) + self.static + self.ggtitle(f"{'-'.join(args)}={group_key!r}") ), )
[docs] def save(self, config, *args, **kwargs) -> "Path | None": """Store the last 2 `args` appended by :meth:`add_tasks`.""" *_args, self.path, self._scenario = args # Call the parent method with the remaining arguments return super().save(config, *_args, **kwargs)
[docs] class PlotTimeSeries(Plot): """Plot of time series data from a scenario.""" single = True #: List of regular expressions corresponding to :attr:`inputs`. These are passed as #: the `expr` argument to :func:`.filter_ts` to filter the entire set of time series #: data. inputs_regex: list[re.Pattern] = []
[docs] @classmethod def add_tasks( cls, c: "Computer", key: "KeyLike", *inputs, strict: bool = False ) -> "KeyLike": from copy import copy from itertools import zip_longest match len(inputs): case 0: k_scenario = "scenario" case 1: k_scenario = inputs[0] case _: raise ValueError(f"Expected at most 1 inputs; got {inputs}") if len(cls.inputs_regex): # Retrieve all time series data, for advanced filtering all_data = Key(k_scenario) + "iamc" c.add(all_data, "get_ts", k_scenario) # Iterate over matched items from `inputs` and `inputs_regex` for k, expr in zip_longest(cls.inputs, cls.inputs_regex): if expr is None: break # Filter the data given by `expr` from all::iamc c.add(k, "filter_ts", all_data, copy(expr)) else: for k in map(Key, cls.inputs): # Add a computation to get the time series data for a specific variable c.add(k, "get_ts", k_scenario, dict(variable=k.name)) # Add the plot itself return super().add_tasks(c, key, strict=strict)
[docs] class PlotFromIAMC(Plot): """:class:`Plot` that uses a subset of data from an IAMC-structured input. :attr:`.Plot.inputs` must be of length 1 and include the dimensions :math:`(n, s, u, v, y)`. """ #: :py:`variable` argument to :func:`genno.compat.pyam.operator.quantity_from_iamc`. iamc_variable_pattern: str
[docs] @classmethod def add_tasks( cls, c: Computer, key: "KeyLike", *inputs, strict: bool = False ) -> "KeyLike": """Select a subset of data and reduce its dimensionality.""" assert 1 == len(cls.inputs) and not inputs k = Keys(input=cls.inputs[0], subset=Key(cls.basename, "nsy", "in")) assert set("nsy") < set(k.input.dims) c.add( k.subset, "quantity_from_iamc", k.input, variable=cls.iamc_variable_pattern ) # Call the upstream method return super(PlotFromIAMC, cls).add_tasks(c, key, k.subset, strict=strict)
[docs] class EmissionsCO2(PlotTimeSeries): """CO₂ Emissions.""" basename = "emission-CO2" inputs = ["Emissions|CO2::iamc"] static = Plot.static + [ p9.aes(x="year", y="value", color="region"), p9.geom_line(), p9.geom_point(), p9.labs(x="Period", y="", color="Region"), ]
[docs] def generate(self, data: pd.DataFrame): self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, data.region.str.contains("GLB")): y_max = max(ggplot.data["value"]) yield ggplot + p9.expand_limits(y=[0, y_max]) + self.ggtitle("")
[docs] class FinalEnergy0(EmissionsCO2): """Final Energy.""" basename = "fe0" inputs = ["Final Energy::iamc"]
[docs] class FinalEnergy1(PlotTimeSeries): """Final Energy.""" basename = "fe1" inputs = ["fe1-0::iamc"] _c = [ "Electricity", "Gases", "Geothermal", "Heat", "Hydrogen", "Liquids", "Solar", "Solids", ] inputs_regex = [re.compile(rf"Final Energy\|({'|'.join(_c)})")] static = Plot.static + [ p9.aes(x="year", y="value", fill="variable"), p9.geom_bar(stat="identity", size=5.0), # 5.0 is the minimum spacing of "year" p9.labs(x="Period", y="", fill="Commodity"), ]
[docs] def generate(self, data: pd.DataFrame): self.unit = data["unit"].unique()[0] for _, ggplot in self.groupby_plot(data, "region"): yield ggplot
[docs] class PrimaryEnergy0(EmissionsCO2): """Primary Energy.""" basename = "pe0" inputs = ["Primary Energy::iamc", "scenario"]
[docs] class PrimaryEnergy1(FinalEnergy1): """Primary Energy.""" basename = "pe1" inputs = ["pe1-0::iamc"] _omit = ["Fossil", "Non-Biomass Renewables", "Secondary Energy Trade"] inputs_regex = [re.compile(rf"Primary Energy\|((?!{'|'.join(_omit)})[^\|]*)")]
[docs] def callback(c: Computer, context: "Context") -> None: """Add all :data:`PLOTS` to `c`. Also add a key "plot all" to triggers the generation of all plots. """ prepare_computer(c, __name__, "plot all")
[docs] def collect( module: str | ModuleType, stage: STAGE | None = None, single: bool | None = None ) -> Iterator[type[Plot]]: """Iterate over plots from `module`. If `stage` or `single` are given, collect() iterates over only those plots where the attributes of the same name match. """ mod = import_module(module) if isinstance(module, str) else module for obj in map(lambda name: getattr(mod, name), dir(mod)): # Check for a concrete subclass of Plot that matches the filters if ( isinstance(obj, type) and issubclass(obj, Plot) and obj not in (Plot, PlotFromIAMC, PlotTimeSeries) and stage in {None, obj.stage} and single in {None, obj.single} ): yield obj
[docs] def prepare_computer( c: Computer, module: str | ModuleType | None = None, target: "KeyLike | None" = None, *, stage: STAGE | None = None, single: bool | None = None, ) -> None: """Add plots to `c` from `module`. Parameters ---------- stage single Passed to :func:`collect`. target If given, add a task at this key that collects and summarizes all added plots. """ # Force matplotlib to use a non-interactive backend for plotting import matplotlib matplotlib.use("pdf") # Iterate over the Plot subclasses defined in the current module keys = [] for plot in collect(module or __name__, stage=stage, single=single): keys.append(f"plot {plot.basename}") c.add(keys[-1], plot) if target: log.info(f"Add {target!r} collecting {len(keys)} plots") c.add(target, "summarize", *keys) else: log.info(f"Added {len(keys)} plots")