"""Tools for IEA (Extended) World Energy Balance (WEB) data."""
import logging
import zipfile
from collections.abc import Iterable
from copy import copy
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional
import genno
import pandas as pd
from genno import Key
from genno.core.key import single_key
from genno.operator import concat
from platformdirs import user_cache_path
from message_ix_models.model.structure import get_codelist
from message_ix_models.tools.exo_data import ExoDataSource, register_source
from message_ix_models.util import cached, package_data_path, path_fallback
from message_ix_models.util._logging import silence_log
import os
import genno
from genno.types import AnyQuantity
from message_ix_models.types import KeyLike
from message_ix_models.util.common import MappingAdapter
log = logging.getLogger(__name__)
#: ISO 3166-1 alpha-3 codes for 'COUNTRY' codes appearing in the 2024 edition. This
#: mapping includes only values that are not matched by :func:`pycountry.lookup`. See
#: :func:`.iso_3166_alpha_3`.
"CONGO": "COD", # Override lookup(…) → COG
"UAE": "ARE",
"UK": "GBR",
#: Dimensions of the data.
#: Mapping from (provider, year, time stamp) → set of file name(s) containing data.
("IEA", "2024"): ( # Timestamped 20240725T0830
("IEA", "2023"): ("WBIG1.zip", "WBIG2.zip"), # Timestamped 20230726T0014
("OECD", "2021"): ("cac5fa90-en.zip",), # Timestamped 20211119T1000
("OECD", "2022"): ("372f7e29-en.zip",), # Timestamped 20230406T1000
("OECD", "2023"): ("8624f431-en.zip",), # Timestamped 20231012T1000
#: Location of :data:`.FILES`; :py:`where=` argument to :func:`.path_fallback`.
#: .. todo:: Change to :py:`"local test"`` after adjusting :file:`transport.yaml`
#: workflow in :mod:`.message_data`.
WHERE = "local private test"
class IEA_EWEB(ExoDataSource):
"""Provider of exogenous data from the IEA Extended World Energy Balances.
To use data from this source, call :func:`.exo_data.prepare_computer` with the
following `source_kw`:
- :py:`provider`: Either 'IEA' or 'OECD'. See :data:`.FILES`.
- :py:`edition`: one of '2021', '2022', or '2023'. See :data:`.FILES`.
- :py:`product` (optional): :class:`str` or :class:`list` of :class:`str`. Select
only these labels from the 'PRODUCT' dimension.
- :py:`flow` (optional): :class:`str` or :class:`list` of :class:`str`. Select only
these labels from the 'FLOW' dimension.
- :py:`transform` (optional): either "A" (default) or "B". See :meth:`.transform`.
- :py:`regions`: **must** also be given with the value :py:`"R12"` if giving
>>> keys = prepare_computer(
... context,
... computer,
... source="IEA_EWEB",
... source_kw=dict(
... provider="OECD", edition="2022", product="CHARCOAL", flow="RESIDENT"
... ),
... )
>>> result = computer.get(keys[0])
id = "IEA_EWEB"
key = Key("energy:n-y-product-flow:iea")
def __init__(self, source, source_kw):
"""Initialize the data source."""
if source != self.id:
raise ValueError(source)
_kw = copy(source_kw)
p = self.provider = _kw.pop("provider", None)
e = self.edition = _kw.pop("edition", None)
files = FILES[(p, e)]
except KeyError:
raise ValueError(f"No IEA data files for (provider={p!r}, edition={e!r})")
self.indexers = dict(MEASURE="TJ")
if product := _kw.pop("product", None):
if flow := _kw.pop("flow", None):
# Handle the 'transform' keyword
self.transform_method = _kw.pop("transform", "A")
regions = _kw.pop("regions", None)
if self.transform_method not in "AB":
raise ValueError(f"transform={self.transform_method!r}")
elif self.transform_method == "B":
if (p, e) != ("IEA", "2024"):
raise ValueError(
f"transform='B' only supported for (provider='IEA', "
f"edition='2024'); got {(p, e)!r}"
elif regions != "R12":
raise ValueError(
f"transform='B' only supported for regions='R12'; got {regions!r}"
if len(_kw):
raise ValueError(_kw)
# Identify a location that contains the files for the given (provider, edition)
# Parent directory relative to which `files` are found
self.path = dir_fallback("iea", files[0], where=WHERE)
def __call__(self):
"""Load and process the data."""
# - Load the data.
# - Convert to pd.Series, then genno.Quantity.
# - Map dimensions.
# - Apply `indexers` to select.
return (
provider=self.provider, edition=self.edition, path=self.path
.rename({"COUNTRY": "n", "TIME": "y", "FLOW": "flow", "PRODUCT": "product"})
.sel(self.indexers, drop=True)
def fwf_to_csv(path: Path, progress: bool = False) -> Path: # pragma: no cover
"""Convert the IEA fixed-width file format to CSV.
This appears to operate at about 900k lines / second, about 1 minute for the IEA
2023 .TXT files. This is faster than doing full pandas I/O, which takes 5–10 minutes
depending on formats.
import io
import re
# Output path
path_out = path.with_suffix(".csv")
if path_out.exists() and path_out.stat().st_mtime > path.stat().st_mtime:
log.info(f"Skip conversion; file exists and is newer than source: {path_out}")
return path_out
# Input and output buffers; read the entire file into memory immediately
file_in = io.BytesIO(path.read_bytes())
file_out = io.BytesIO()
# Regular expression to split lines
expr = re.compile(b" +")
if progress:
from tqdm import tqdm
iterator: Iterable[bytes] = tqdm(file_in, desc=f"{path} → {path_out}")
iterator = file_in
# Convert to CSV
for line in iterator:
# Write to file
return path_out
def unpack_zip(path: Path) -> Path:
"""Unpack a ZIP archive."""
cache_dir = user_cache_path("message-ix-models", ensure_exists=True).joinpath("iea")
log.info(f"Decompress {path} to {cache_dir}")
with zipfile.ZipFile(path) as zf:
members = zf.infolist()
assert 1 == len(members)
zi = members[0]
# Candidate path for the extracted file
target = cache_dir.joinpath(zi.filename)
if target.exists() and target.stat().st_size >= zi.file_size:
log.info(f"Skip extraction of {target}")
return target
return Path(zf.extract(members[0], path=cache_dir))
def iea_web_data_for_query(
base_path: Path, *filenames: str, query_expr: str
) -> pd.DataFrame:
"""Load data from `base_path` / `filenames` in IEA WEB formats."""
import dask.dataframe as dd
names_to_read = [] # Filenames to pass to dask.dataframe
# Keyword arguments for read_csv()
# - Certain values appearing in (IEA, 2024) are mapped to NaN.
# - The Value column is numeric.
args: dict[str, Any] = dict(
dtype={"Value": float},
na_values=[".. ", "c ", "x "],
# Iterate over origin filenames
for filename in filenames:
path = base_path.joinpath(filename)
if path.suffix == ".zip":
path = unpack_zip(path)
if path.suffix == ".TXT": # pragma: no cover
names_to_read.append(fwf_to_csv(path, progress=True))
args.update(header=None, names=DIMS + ["Value"])
args.update(header=0, usecols=DIMS + ["Value"])
with silence_log("fsspec.local"):
ddf = dd.read_csv(names_to_read, engine="pyarrow", **args)
ddf = ddf[ddf["MEASURE"] == "TJ"]
# NB compute() must precede query(), else "ValueError: The columns in the
# computed data do not match the columns in the provided metadata" occurs with
# the CSV-formatted data.
result = ddf.compute().query(query_expr).dropna(subset=["Value"])
log.info(f"{len(result)} observations")
return result
def load_data(
provider: str,
edition: str,
query_expr="MEASURE == 'TJ' and TIME >= 1980",
path: Optional[Path] = None,
) -> pd.DataFrame:
"""Load data from the IEA World Energy Balances.
provider : str
First entry in :data:`.FILES`.
edition : str
Second entry in :data:`.FILES`.
query_expr : str, optional
Used with :meth:`pandas.DataFrame.query` to reduce the returned data.
base_path : os.Pathlike, optional
Path containing :data:`.FILES`. If not provided, locations within
:mod:`message_data` or :mod:`message_ix_models` are used.
The data frame has one column for each of :data:`.DIMS`, plus "Value".
path = path or package_data_path("test", "iea")
if "test" in path.parts:
log.warning(f"Reading random data from {path}")
return iea_web_data_for_query(
path, *FILES[(provider, edition)], query_expr=query_expr
def generate_code_lists(
provider: str, edition: str, output_path: Optional["os.PathLike"] = None
) -> None:
"""Extract structure from the data itself."""
import sdmx.model.v21 as m
from message_ix_models.util.sdmx import register_agency, write
output_path = output_path or package_data_path("sdmx")
IEA = m.Agency(
name="International Energy Agency",
# Read the data
files = FILES[(provider, edition)]
path = dir_fallback("iea", files[0], where=WHERE)
data = iea_web_data_for_query(path, *files, query_expr="TIME > 0")
for concept_id in ("COUNTRY", "FLOW", "PRODUCT"):
# Create a code list with the unique values from this dimension
cl = m.Codelist(id=f"{concept_id}_{provider}", maintainer=IEA, version=edition)
m.Code(id=code_id) for code_id in sorted(data[concept_id].dropna().unique())
write(cl, output_path)
def dir_fallback(*parts, **kwargs) -> Path:
"""Return path to the directory that *contains* a particular file.
If the last of `parts` is a string with path separators (for instance "a/b/c"), this
function returns a parent of the path returned by :func:`.path_fallback`, in which
this part is located.
f = Path(parts[-1])
return path_fallback("iea", f, where=WHERE).parents[len(f.parts) - 1]
def get_mapping(provider: str, edition: str) -> "MappingAdapter":
"""Return a Mapping Adapter from codes appearing in IEA EWEB data.
For each code in the IEA 'COUNTRY' code list for (`provider`, `edition`) that is a
country name, the adapter maps the name to a corresponding ISO 3166-1 alpha-3 code.
:data:`COUNTRY_NAME` is used for values particular to IEA EWEB.
Using the adapter makes data suitable for aggregation using the
:mod:`message_ix_models` ``node`` code lists, which include those alpha-3 codes as
children of each region code.
from message_ix_models.util import MappingAdapter, pycountry
from message_ix_models.util.sdmx import read
maps: dict[str, list[tuple[str, str]]] = dict()
for concept, dim in ("COUNTRY", "n"), ("FLOW", "flow"), ("PRODUCT", "product"):
if concept == "COUNTRY":
cl = read(f"IEA:{concept}_{provider}({edition})")
maps[dim] = list()
for code in cl:
new_id = pycountry.iso_3166_alpha_3(code.id) or code.id
maps[dim].append((code.id, new_id))
return MappingAdapter(maps)
def get_node_groups_B() -> dict[Literal["n"], dict[str, list[str]]]:
"""Return groups for aggregating on |n| as part of the :py:`transform='B'` method.
These are of three kinds:
1. For the nodes 'R12_FSU', 'R12_PAO', 'R12_RCPA', 'R12_WEU', the common :ref:`R12`
is used.
2. For the nodes 'R12_CHN', 'R12_MEA', 'R12_LAM', 'R12_NAM', the labels in the
``material-region`` annotation are used. These may reference certain labels
specific to IEA EWEB; omit certain alpha-3 codes present in (1); or both.
3. For the nodes 'R12_AFR', 'R12_PAS', 'R12_SAS', a mix of the codes generated by
:func:`transform_B` and alpha-3 codes are used.
.. note:: This function mirrors the behaviour of code that is not present in
:mod:`message_ix_models` using a file :file:`R12_SSP_V1.yaml` that is also not
present. See
`iiasa/message-ix-models#201 <https://github.com/iiasa/message-ix-models/pull/201#pullrequestreview-2144852265>`_
for a detailed discussion.
See also
result = dict(
cl = get_codelist("node/R12")
for n in "R12_FSU", "R12_PAO", "R12_RCPA", "R12_WEU":
result[n] = [c.id for c in cl[n].child]
for n in "R12_CHN", "R12_MEA", "R12_EEU", "R12_LAM", "R12_NAM":
result[n] = cl[n].eval_annotation(id="material-region")
return dict(n=result)