Source code for message_ix_models.project.edits

import logging
import sys
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Optional

import pandas as pd
import sdmx
from genno import Quantity
from genno.compat.sdmx.operator import dataset_to_quantity, quantity_to_message
from sdmx.message import StructureMessage
from sdmx.model.common import Codelist
from sdmx.model.v21 import DataStructureDefinition

from message_ix_models.util import local_data_path

if TYPE_CHECKING:
    from genno.types import AnyQuantity

    from message_ix_models import Context

log = logging.getLogger(__name__)

#: Dimensions of the PASTA activity data (= columns in CSV file in long format).
PASTA_DIMS = ["Region", "Vehicle_type", "Data", "Sector", "Scope", "Scenario", "Year"]


[docs] def pasta_native_to_sdmx() -> "AnyQuantity": """Read PASTA activity data from :file:`{[message_local_data]}/edits/pasta.csv`. The file :file:`{[message_local_data]}/edits/pasta-data.xml` is created with an SDMX-ML formatted version of the data set. Returns ------- .Quantity with dimensions :data:`.DIMS`. See also -------- generate_pasta_structures """ path = local_data_path("edits", "pasta.csv") if not path.exists(): # Create the directory path.parent.mkdir(parents=True, exist_ok=True) print(f"Not found: {path}") sys.exit(1) # - Read the data file. # - Rename "Value" to "value", as expected by genno. # - Set index. df = pd.read_csv(path).rename(columns={"Value": "value"}).set_index(PASTA_DIMS) # Convert to genno.Quantity q = Quantity(df) # Show the dimensions and codes print(q.coords) # Retrieve an SDMX structure message containing a data structure definition (DSD) sm = generate_pasta_structures(q) dsd = sm.structure["PASTA"] # Convert `q` to an SDMX data message msg = quantity_to_message(q, dsd) # Write to file with open(local_data_path("edits", "pasta-data.xml"), "wb") as f: f.write(sdmx.to_xml(msg, pretty_print=True)) return q
[docs] def generate_pasta_structures( data: Optional["AnyQuantity"] = None, ) -> "StructureMessage": """Generate SDMX data structures for the PASTA activity data flows. The file :file:`{[message_local_data]}/edits/pasta-structures.xml` is created or updated. """ from sdmx.model.common import Agency, Representation # Create a structure message msg = StructureMessage() ITF = Agency(id="OECD.ITF") # Common attributes for SDMX maintainable artefacts generated by this function ma_args = dict( is_external_reference=False, is_final=True, maintainer=ITF, version="0.1", ) if data is not None: # Create code lists for the coords on the data [msg.add(cl) for cl in coords_to_codelists(data, **ma_args)] # Create a data structure definition (DSD) and add it to the message dsd = DataStructureDefinition(id="PASTA", **ma_args) msg.add(dsd) # Create dimensions within the DSD for dim_id in PASTA_DIMS: dim = dsd.dimensions.getdefault(id=dim_id) if data is not None: # Record the codelist that enumerates this dimension dim.local_representation = Representation( enumerated=msg.codelist[dim_id.upper()] ) # Add the measure; currently the well-known SDMX "OBS_VALUE" # TODO Change to specific measure for each structure dsd.measures.getdefault(id="OBS_VALUE") # Write to file with open(local_data_path("edits", "pasta-structure.xml"), "wb") as f: f.write(sdmx.to_xml(msg, pretty_print=True)) return msg
[docs] def coords_to_codelists( qty: "AnyQuantity", *, id_transform: Optional[Callable] = str.upper, **kwargs ) -> list["Codelist"]: """Convert the coordinates of `qty` to a collection of :class:`.Codelist`. .. todo:: Move upstream, to :mod:`genno`. """ result = [] def _transform(value: Any) -> str: try: return id_transform(value) except TypeError: return str(value) for dim_id, labels in qty.coords.items(): cl = Codelist(id=_transform(dim_id), **kwargs) [cl.setdefault(id=str(label)) for label in labels.data] result.append(cl) return result
#: Mapping from OECD.ITF "Data" codes to :mod:`message_ix_models` quantity names and #: units. DATA_MAP = { "PKM": ("passenger activity", "km"), "TKM": ("freight activity", "km"), "VKM": ("vehicle activity", "km"), }
[docs] def gen_demand(context: "Context") -> None: """Generate MESSAGEix-Transport demand data from PASTA. .. todo:: Convert an ExoDataSource class; connect to the genno.Computer used for MESSAGEix-Transport. """ from genno import Key from genno.operator import assign_units, rename_dims, select try: from genno.operator import rename except ImportError: # genno < 1.26 def rename(qty: "AnyQuantity", name: str) -> "AnyQuantity": qty.name = name return qty # Read the SDMX structures sm = sdmx.read_sdmx(local_data_path("edits", "pasta-structure.xml")) # - Read the SDMX data. # - Convert to genno.Quantity. # - Rename dimensions to lower case and matching MESSAGEix-GLOBIOM short names, # where possible. # - TODO Aggregate 'scope', 'sector', 'vehicle_type' dimensions. # - TODO Interpolate to target periods using ExoDataSource. # - TODO Aggregate 'n' dimension from countries to MESSAGEix-GLOBIOM regions using # ExoDataSource. q_all = dataset_to_quantity( sdmx.read_sdmx( local_data_path("edits", "pasta-data.xml"), dsd=sm.structure["PASTA"] ).data[0] ).pipe( rename_dims, { "Region": "n", "Scenario": "scenario", "Scope": "scope", "Sector": "sector", "Vehicle_type": "vt", "Year": "y", }, ) # Separate according to "Data" codes q = dict() keys = [] for label in map(str, q_all.coords["Data"].data): name, units = DATA_MAP[label] # - Select only the data with this code. # - Assign units. # - Assign name. q[name] = ( q_all.pipe(select, indexers={"Data": label}, drop=True) .pipe(assign_units, units) .pipe(rename, name) ) # Generate the genno key that identifies this quantity keys.append(Key(q[name])) log.info(f"{len(q[name])} observations for {keys[-1]}") return q