Source code for message_ix_models.tools.advance

"""Handle data from the ADVANCE project."""
import logging
from pathlib import Path
from typing import List, Optional, Tuple
from zipfile import ZIP_DEFLATED, ZipFile

import numpy as np
import pandas as pd
import pint
from genno import Quantity

from message_ix_models.util import (
    cached,
    local_data_path,
    maybe_query,
    private_data_path,
)

log = logging.getLogger(__name__)

#: Expected location of the ADVANCE WP2 data snapshot.
LOCATION = "advance", "advance_compare_20171018-134445.csv.zip"

#: Name of the data file within the archive.
NAME = "advance_compare_20171018-134445.csv"

#: Standard dimensions for data produced as snapshots from the IIASA ENE Program
#: “WorkDB”.
#:
#: .. todo:: Move to a common location for use with other snapshots in the same format.
DIMS = ["model", "scenario", "region", "variable", "unit", "year"]


[docs]@cached def get_advance_data(query: Optional[str] = None) -> pd.Series: """Return data from the ADVANCE Work Package 2 data snapshot at :data:`LOCATION`. Parameters ---------- query : str, optional Passed to :meth:`pandas.DataFrame.query` to limit the returned values. Returns ------- pandas.Series with a :class:`pandas.MultiIndex` having the levels :data:`.DIMS`. """ try: path = private_data_path(*LOCATION) except TypeError: path = local_data_path(*LOCATION) return _read_workdb_snapshot(path, NAME).pipe(maybe_query, query)
[docs]def advance_data(variable: str, query: Optional[str] = None) -> Quantity: """Return a single ADVANCE data `variable` as a :class:`genno.Quantity`. Parameters ---------- query : str, optional Passed to :func:`get_advance_data`. Returns ------- genno.Quantity with the dimensions :data:`.DIMS` and name `variable`. If the units of the data for `variable` are consistent and parseable by :mod:`pint`, the returned Quantity has these units; otherwise units are discarded and the returned Quantity is dimensionless. """ data = ( get_advance_data(query) .rename("value") .xs(variable, level="variable") .reset_index("unit") ) if len(data.unit.unique()) > 1: # pragma: no cover log.info(f"Non-unique units for {variable!r}; discarded") units = "" else: units = data.unit.iloc[0] result = Quantity(data["value"], name=variable) try: result.units = units except pint.errors.PintError as e: # pragma: no cover log.info(f'"{e}" when parsing {units!r}; discarded') return result
[docs]@cached def _read_workdb_snapshot(path: Path, name: str) -> pd.Series: """Read the data file. The expected format is a ZIP archive at `path` containing a member at `name` in CSV format, with columns corresponding to :data:`DIMS`, except for “year”, which is stored as column headers (‘wide’ format). (This corresponds to an older version of the “IAMC format,” without more recent additions intended to represent sub-annual time resolution using a separate column.) .. todo:: Move to a general location for use with other files in the same format. """ with ZipFile(path) as zf: # Open the ZIP archive with zf.open(name) as f: # Open a particular member # - Read data using upper case column names, then convert to lower-case. # - Drop null rows. # - Stack the “year” dimension (‘long’ format), creating a pd.Series. # - Apply the index names. return ( pd.read_csv(f, index_col=list(map(str.upper, DIMS[:-1]))) .rename(columns=lambda c: int(c)) .dropna(how="all") .stack() .rename_axis(DIMS) )
[docs]def _fuzz_data(size=1e2, include: List[Tuple[str, str]] = []): """Select a subset of the data for use in testing. Parameters ---------- size : numeric Number of rows to include. include : sequence of 2-tuple (str, str) (variable name, unit) to include. The data will be partly duplicated to ensure the given variable name(s) are included. """ size = int(size) rng = np.random.default_rng() # - Select `size` rows at random from the full data set. # - Use their index for a new pd.Series with random data. # - Convert to pd.DataFrame with upper-case column names # - Drop duplicated indices # - Return to original wide format. columns = list(map(str.upper, DIMS)) dfs = [ pd.Series( rng.random(size), index=get_advance_data().sample(size).index, name="value" ) .rename_axis(columns) .reset_index() .drop_duplicates(subset=columns) .pivot(index=columns[:-1], columns="YEAR", values="value") ] # Duplicate data for (variable, unit) pairs required per `include` for variable, unit in include: dfs.append( dfs[0] .query(f"VARIABLE != {variable!r}") .assign(VARIABLE=variable, UNIT=unit) ) # Path for output archive # For ordinary testing, output to a temporary directory target = local_data_path("test", *LOCATION) # To update/overwrite the data file in the repo, uncomment this line # target = package_data_path("test", *LOCATION) target.parent.mkdir(exist_ok=True, parents=True) # Concatenate data, write to the target member of the target with ZipFile(target, "w", ZIP_DEFLATED) as zf: with zf.open(NAME, "w") as f: pd.concat(dfs).to_csv(f, index=False)