import logging
from collections.abc import Callable, Hashable
from functools import cache
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
import pytest
from genno import Computer
from message_ix import Scenario
from message_ix_models.model.transport.testing import MARK as MARK_TRANSPORT
from message_ix_models.project.ssp.transport import (
METHOD,
get_computer,
get_scenario_code,
process_df,
process_file,
track_GAINS,
v_to_emi_coords,
v_to_fe_coords,
)
from message_ix_models.testing import MARK
from message_ix_models.tools.iea import web
from message_ix_models.util import package_data_path
if TYPE_CHECKING:
import pathlib
log = logging.getLogger(__name__)
METHOD_PARAM = (
METHOD.A,
METHOD.B,
pytest.param(METHOD.C, marks=MARK[0]),
)
# Test data file paths
V1 = "SSP_dev_SSP2_v0.1_Blv0.18_baseline_prep_lu_bkp_solved_materials_2025_macro.csv"
V2 = "SSP_LED_v2.3.1_baseline.csv"
V3 = "SSP_SSP2_v6.2_SSP2_-_Low_Emissions.csv"
# Enumeration/flags for codes appearing in VARIABLE
IN_ = 1 # Data appears in the input file only
OUT = 2 # Data appears in the output file *and* with a modified magnitude
I_O = IN_ | OUT # Both
#: Emissions species codes appearing in the IAMC-structured / reported data; these are
#: *different* from those internal to the model.
SPECIES = {"CH4", "BC", "CO", "CO2", "N2O", "NH3", "NOx", "OC", "Sulfur", "VOC"}
#: Species for which no aviation-specific emission factor values are available.
SPECIES_WITHOUT_EF: set[str] = set()
[docs]
def check(df_in: pd.DataFrame, df_out: pd.DataFrame, method: METHOD) -> None:
"""Common checks for :func:`test_process_df` and :func:`test_process_file`."""
# Identify dimension columns
dims_wide = list(df_in.columns)[:5] # …in 'wide' layout
dims = dims_wide + ["Year"] # …in 'long' layout
# Convert wide to long; sort
def _to_long(df):
return (
df.melt(dims_wide, var_name=dims[-1])
.astype({dims[-1]: int})
.sort_values(dims)
)
# df_out.to_csv("debug-out.csv") # DEBUG Dump to file
df_in = _to_long(df_in)
df_out = _to_long(df_out)
# Input data already contains the variable names to be modified
assert expected_variables(IN_, method) <= set(df_in["Variable"].unique())
region = set(df_in["Region"].unique())
# Identify the directory from which IEA EWEB data is read
iea_eweb_dir = web.dir_fallback(
web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where()
)
# True if the fuzzed test data are being used
iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web")
log.info(f"{iea_eweb_test_data = }")
# Number of added values; number of modified values
N_new, N_exp = {
(METHOD.A, False): (0, 10280),
(METHOD.A, True): (0, 10280),
(METHOD.B, False): (13 * 20, 10120),
(METHOD.B, True): (10 * 20, 7720),
(METHOD.C, False): (13 * 20, 7000),
(METHOD.C, True): (13 * 20, 7000),
}[(method, iea_eweb_test_data)]
# Data have the same length
assert len(df_in) + N_new == len(df_out)
# Output has the same set of region codes as input
assert region == set(df_out["Region"].unique())
# Diff data:
# - Outer merge.
# - Fill NaNs resulting from insert_nans()
# - Compute diff and select rows where diff is larger than a certain value
df = (
df_in.merge(df_out, how="outer", on=dims, suffixes=("_in", "_out"))
.fillna(0)
.query("abs(value_out - value_in) > 1e-16")
)
# All regions and "World" have modified values
N_reg = {METHOD.A: 13, METHOD.B: 9, METHOD.C: 13}[method]
assert N_reg <= len(df["Region"].unique())
# if N_exp != len(df): # DEBUG
# df.to_csv("debug-diff.csv") # Dump to file
# print(df.to_string(max_rows=50)) # Show in test output
# msg = f"Unexpected number of modified values: {N_exp} != {len(df)}"
# assert N_exp == len(df), msg
# All of the expected 'variable' codes have been modified
assert expected_variables(OUT, method) == set(df["Variable"].unique())
cond = df.query("value_out < 0")
if len(cond):
msg = "Negative emissions totals after processing"
print(f"\n{msg}:", cond.to_string(), sep="\n")
assert iea_eweb_test_data, msg # Negative values → fail if NOT using test data
[docs]
@cache
def expected_variables(flag: int, method: METHOD) -> set[str]:
"""Set of expected ‘Variable’ codes according to `flag` and `method`."""
# Shorthand
edb, edt = "Energy|Demand|Bunkers", "Energy|Demand|Transportation"
result = set()
# Emissions
for e in SPECIES:
# Expected data flows in which these variable codes appear
exp = IN_ if (e in SPECIES_WITHOUT_EF and method != METHOD.A) else I_O
if flag & exp:
result |= {
f"Emissions|{e}|{edb}",
f"Emissions|{e}|{edb}|International Aviation",
} | (
{
f"Emissions|{e}|{edt}",
# NB Present up to input data format V2; not in V3
# f"Emissions|{e}|{edt}|Road Rail and Domestic Shipping",
}
if method == METHOD.C
else set()
)
# Final Energy
if method != METHOD.A:
result |= {
"Final Energy|Bunkers",
"Final Energy|Bunkers|Liquids|Oil",
"Final Energy|Transportation",
"Final Energy|Transportation|Liquids|Oil",
}
if flag & OUT:
result.add("Final Energy|Bunkers|International Aviation")
return result
[docs]
def insert_nans(
df: pd.DataFrame, variable_expr: str, year_cond: Callable[[Hashable], bool]
) -> pd.DataFrame:
"""Replace zeros with :py:`np.nan` in `df`.
This occurs only where:
1. The 'Variable' column contains a string that matches `variable_expr`.
2. The `year_cond` returns :any:`True` for the column name.
"""
return df.where(
~df.Variable.str.fullmatch(variable_expr),
df.replace({c: {0: np.nan} for c in filter(year_cond, df.columns)}),
)
[docs]
@get_computer.minimum_version
def test_cli(tmp_path, mix_models_cli, test_context, input_xlsx_path) -> None:
"""Code can be invoked from the command-line."""
from shutil import copyfile
# Locate a temporary data file
input_file = input_xlsx_path
path_in = tmp_path.joinpath(input_file.name)
# Copy the input file to the test data directory
copyfile(input_file, path_in)
# Code runs
result = mix_models_cli.invoke(["ssp", "transport", "--method=A", f"{path_in}"])
assert 0 == result.exit_code, result.output
# Output path was determined automatically and exists
path_out = tmp_path.joinpath(path_in.stem + "_out.xlsx")
assert path_out.exists()
# Messages were printed about file handling
for message in (
"Convert Excel input to ",
"No PATH_OUT given; write to ",
"Convert CSV output to ",
):
assert message in result.output
[docs]
@pytest.mark.parametrize(
"expected_id, model_name, scenario_name",
[
("LED-SSP1", "SSP_LED_v2.3.1", "baseline_1000f"),
("LED-SSP1", "SSP_LED_v2.3.1", "baseline"),
("LED-SSP2", "SSP_LED_v2.3.1", "SSP2 - Very Low Emissions"),
("SSP1", "SSP_SSP1_v2.3.1", "baseline_1000f"),
("SSP1", "SSP_SSP1_v2.3.1", "baseline"),
("SSP1", "SSP_SSP1_v2.3.1", "SSP1 - Low Emissions"),
("SSP1", "SSP_SSP1_v2.3.1", "SSP1 - Very Low Emissions"),
("SSP2", "SSP_SSP2_v2.4.1", "baseline_1000f"),
("SSP2", "SSP_SSP2_v2.4.1", "baseline"),
("SSP2", "SSP_SSP2_v2.4.1", "SSP2 - Low Emissions"),
("SSP2", "SSP_SSP2_v2.4.1", "SSP2 - Low Overshoot"),
("SSP2", "SSP_SSP2_v2.4.1", "SSP2 - Medium Emissions"),
("SSP2", "SSP_SSP2_v2.4.1", "SSP2 - Medium-Low Emissions"),
("SSP3", "SSP_SSP3_v2.4.1", "baseline_1000f"),
("SSP3", "SSP_SSP3_v2.4.1", "SSP3 - High Emissions"),
("SSP4", "SSP_SSP4_v2.3.1", "baseline_1000f"),
("SSP4", "SSP_SSP4_v2.3.1", "baseline"),
("SSP4", "SSP_SSP4_v2.3.1", "SSP4 - Low Overshoot"),
("SSP5", "SSP_SSP5_v2.4.1", "baseline_1000f"),
("SSP5", "SSP_SSP5_v2.4.1", "SSP5 - High Emissions"),
("SSP5", "SSP_SSP5_v2.4.1", "SSP5 - Low Overshoot"),
],
)
def test_get_scenario_code(expected_id, model_name, scenario_name) -> None:
result = get_scenario_code(model_name, scenario_name)
assert expected_id == result.id
[docs]
@MARK["#375"]
@MARK_TRANSPORT[10]
@get_computer.minimum_version
@pytest.mark.parametrize("method", METHOD_PARAM)
def test_process_df(pytestconfig, test_context, input_csv_path, method) -> None:
"""Test process_df().
For METHOD.C, the --ixmp-user-config CLI option **must** be given.
"""
# - Read input data
# - Replace some 0 values with np.nan to replicate conditions in calling code.
df_in = pd.read_csv(input_csv_path).pipe(
insert_nans,
r"Emissions\|.*\|International Aviation",
lambda c: str(c).isnumeric() and int(c) >= 2020,
)
if pytestconfig.option.ixmp_user_config:
platform_name = None
else:
mp = test_context.get_platform()
platform_name = mp.name
# NB Here we might create the particular scenario that process_df() expects for
# METHOD.C, but this includes a specific version, and it is not possible to
# create a specific version on JDBCBackend
s = Scenario(
mp, "MESSAGEix-GLOBIOM 1.1-T-R12 ci nightly", "SSP_2024.2 baseline", "new"
)
del s
# Code runs
df_out = process_df(df_in, method=method, platform_name=platform_name)
# Output satisfies expectations
check(df_in, df_out, method)
[docs]
@get_computer.minimum_version
@MARK_TRANSPORT[10]
@pytest.mark.parametrize("method", METHOD_PARAM)
def test_process_file(tmp_path, test_context, input_csv_path, method) -> None:
"""Code can be called from Python."""
# Locate a temporary data file
path_in = input_csv_path
path_out = tmp_path.joinpath("output.csv")
# Code runs
process_file(path_in=path_in, path_out=path_out, method=method)
# Output path exists
assert path_out.exists()
# Read input and output files
df_in = pd.read_csv(path_in)
df_out = pd.read_csv(path_out)
# Output satisfies expectations
check(df_in, df_out, method)
[docs]
def test_track_GAINS() -> None:
"""Coverage-only test of :func:`.track_GAINS`."""
c = Computer()
# Function runs without error
result = track_GAINS(c)
assert set("eny") == set(result.dims)
# TODO Extend with further assertions
[docs]
@pytest.mark.parametrize(
"value, exp",
(
("Emissions|CH4", {"e": "CH4", "s": "_T", "t": "_T"}),
("Emissions|CH4|Energy", {"e": "CH4", "s": "Energy", "t": "_T"}),
("Emissions|CH4|Energy|Demand", {"e": "CH4", "s": "Energy|Demand", "t": "_T"}),
(
"Emissions|CH4|Energy|Combustion",
{"e": "CH4", "s": "Energy|Combustion", "t": "_T"},
),
(
"Emissions|CH4|Fossil Fuels and Industry",
{"e": "CH4", "s": "Fossil Fuels and Industry", "t": "_T"},
),
(
"Emissions|CH4|Energy|Demand|Bunkers",
{"e": "CH4", "s": "Energy|Demand", "t": "Bunkers"},
),
(
"Emissions|CH4|Energy|Demand|Transportation|Foo",
{"e": "CH4", "s": "Energy|Demand", "t": "Transportation|Foo"},
),
),
)
def test_v_to_emi_coords(value: str, exp) -> None:
assert exp == v_to_emi_coords(value)
[docs]
@pytest.mark.parametrize(
"value, exp",
(
("Final Energy|Bunkers", {"c": "", "t": "Bunkers"}),
(
"Final Energy|Bunkers|International Aviation",
{"c": "", "t": "Bunkers|International Aviation"},
),
("Final Energy|Bunkers|Liquids|Oil", {"c": "Liquids|Oil", "t": "Bunkers"}),
(
"Final Energy|Transportation (w/ bunkers)",
{"c": "", "t": "Transportation (w/ bunkers)"},
),
(
"Final Energy|Transportation (w/ bunkers)|Liquids|Oil",
{"c": "Liquids|Oil", "t": "Transportation (w/ bunkers)"},
),
("Final Energy|Transportation", {"c": "", "t": "Transportation"}),
(
"Final Energy|Transportation|Liquids|Oil",
{"c": "Liquids|Oil", "t": "Transportation"},
),
),
)
def test_v_to_fe_coords(value: str, exp) -> None:
assert exp == v_to_fe_coords(value)