import logging
import sys
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Optional
import pandas as pd
import sdmx
from genno import Quantity
from genno.compat.sdmx.operator import dataset_to_quantity, quantity_to_message
from sdmx.message import StructureMessage
from sdmx.model.common import Codelist
from sdmx.model.v21 import DataStructureDefinition
from message_ix_models.util import local_data_path
if TYPE_CHECKING:
from genno.types import AnyQuantity
from message_ix_models import Context
log = logging.getLogger(__name__)
#: Dimensions of the PASTA activity data (= columns in CSV file in long format).
PASTA_DIMS = ["Region", "Vehicle_type", "Data", "Sector", "Scope", "Scenario", "Year"]
[docs]def pasta_native_to_sdmx() -> "AnyQuantity":
"""Read PASTA activity data from :file:`{[message_local_data]}/edits/pasta.csv`.
The file :file:`{[message_local_data]}/edits/pasta-data.xml` is created with an
SDMX-ML formatted version of the data set.
Returns
-------
.Quantity
with dimensions :data:`.DIMS`.
See also
--------
generate_pasta_structures
"""
path = local_data_path("edits", "pasta.csv")
if not path.exists():
# Create the directory
path.parent.mkdir(parents=True, exist_ok=True)
print(f"Not found: {path}")
sys.exit(1)
# - Read the data file.
# - Rename "Value" to "value", as expected by genno.
# - Set index.
df = pd.read_csv(path).rename(columns={"Value": "value"}).set_index(PASTA_DIMS)
# Convert to genno.Quantity
q = Quantity(df)
# Show the dimensions and codes
print(q.coords)
# Retrieve an SDMX structure message containing a data structure definition (DSD)
sm = generate_pasta_structures(q)
dsd = sm.structure["PASTA"]
# Convert `q` to an SDMX data message
msg = quantity_to_message(q, dsd)
# Write to file
with open(local_data_path("edits", "pasta-data.xml"), "wb") as f:
f.write(sdmx.to_xml(msg, pretty_print=True))
return q
[docs]def generate_pasta_structures(
data: Optional["AnyQuantity"] = None,
) -> "StructureMessage":
"""Generate SDMX data structures for the PASTA activity data flows.
The file :file:`{[message_local_data]}/edits/pasta-structures.xml` is created or
updated.
"""
from sdmx.model.common import Agency, Representation
# Create a structure message
msg = StructureMessage()
ITF = Agency(id="OECD.ITF")
# Common attributes for SDMX maintainable artefacts generated by this function
ma_args = dict(
is_external_reference=False,
is_final=True,
maintainer=ITF,
version="0.1",
)
if data is not None:
# Create code lists for the coords on the data
[msg.add(cl) for cl in coords_to_codelists(data, **ma_args)]
# Create a data structure definition (DSD) and add it to the message
dsd = DataStructureDefinition(id="PASTA", **ma_args)
msg.add(dsd)
# Create dimensions within the DSD
for dim_id in PASTA_DIMS:
dim = dsd.dimensions.getdefault(id=dim_id)
if data is not None:
# Record the codelist that enumerates this dimension
dim.local_representation = Representation(
enumerated=msg.codelist[dim_id.upper()]
)
# Add the measure; currently the well-known SDMX "OBS_VALUE"
# TODO Change to specific measure for each structure
dsd.measures.getdefault(id="OBS_VALUE")
# Write to file
with open(local_data_path("edits", "pasta-structure.xml"), "wb") as f:
f.write(sdmx.to_xml(msg, pretty_print=True))
return msg
[docs]def coords_to_codelists(
qty: "AnyQuantity", *, id_transform: Optional[Callable] = str.upper, **kwargs
) -> list["Codelist"]:
"""Convert the coordinates of `qty` to a collection of :class:`.Codelist`.
.. todo:: Move upstream, to :mod:`genno`.
"""
result = []
def _transform(value: Any) -> str:
try:
return id_transform(value)
except TypeError:
return str(value)
for dim_id, labels in qty.coords.items():
cl = Codelist(id=_transform(dim_id), **kwargs)
[cl.setdefault(id=str(label)) for label in labels.data]
result.append(cl)
return result
#: Mapping from OECD.ITF "Data" codes to :mod:`message_ix_models` quantity names and
#: units.
DATA_MAP = {
"PKM": ("passenger activity", "km"),
"TKM": ("freight activity", "km"),
"VKM": ("vehicle activity", "km"),
}
[docs]def gen_demand(context: "Context") -> None:
"""Generate MESSAGEix-Transport demand data from PASTA.
.. todo:: Convert an ExoDataSource class; connect to the genno.Computer used for
MESSAGEix-Transport.
"""
from genno import Key
from genno.operator import assign_units, rename_dims, select
try:
from genno.operator import rename
except ImportError: # genno < 1.26
def rename(qty: "AnyQuantity", name: str) -> "AnyQuantity":
qty.name = name
return qty
# Read the SDMX structures
sm = sdmx.read_sdmx(local_data_path("edits", "pasta-structure.xml"))
# - Read the SDMX data.
# - Convert to genno.Quantity.
# - Rename dimensions to lower case and matching MESSAGEix-GLOBIOM short names,
# where possible.
# - TODO Aggregate 'scope', 'sector', 'vehicle_type' dimensions.
# - TODO Interpolate to target periods using ExoDataSource.
# - TODO Aggregate 'n' dimension from countries to MESSAGEix-GLOBIOM regions using
# ExoDataSource.
q_all = dataset_to_quantity(
sdmx.read_sdmx(
local_data_path("edits", "pasta-data.xml"), dsd=sm.structure["PASTA"]
).data[0]
).pipe(
rename_dims,
{
"Region": "n",
"Scenario": "scenario",
"Scope": "scope",
"Sector": "sector",
"Vehicle_type": "vt",
"Year": "y",
},
)
# Separate according to "Data" codes
q = dict()
keys = []
for label in map(str, q_all.coords["Data"].data):
name, units = DATA_MAP[label]
# - Select only the data with this code.
# - Assign units.
# - Assign name.
q[name] = (
q_all.pipe(select, indexers={"Data": label}, drop=True)
.pipe(assign_units, units)
.pipe(rename, name)
)
# Generate the genno key that identifies this quantity
keys.append(Key(q[name]))
log.info(f"{len(q[name])} observations for {keys[-1]}")
return q