Source code for message_ix_models.tests.report.test_operator

import os
import re
from collections.abc import Iterator
from copy import deepcopy
from pathlib import Path

import ixmp
import message_ix
import pandas as pd
import pandas.testing as pdt
import pytest
import xarray as xr
from genno import Computer, Quantity
from ixmp.testing import assert_logs
from message_ix.testing import make_dantzig

from message_ix_models import Context, ScenarioInfo
from message_ix_models.model.structure import get_codes
from message_ix_models.report.operator import (
    compound_growth,
    filter_ts,
    from_url,
    get_ts,
    gwp_factors,
    latest_reporting,
    make_output_path,
    model_periods,
    remove_ts,
    share_curtailment,
    summarize,
)


[docs] @pytest.fixture def c() -> Computer: return Computer()
[docs] @pytest.fixture def scenario(test_context): mp = test_context.get_platform() yield make_dantzig(mp)
[docs] def test_compound_growth(): """:func:`.compound_growth` on a 2-D quantity.""" qty = Quantity( xr.DataArray( [ [1.01, 1.0, 1.02, 1e6], # Varying growth rates for x=x1 [1.0, 1.0, 1.0, 1.0], # No rates/constant for x=x2 ], coords=(["x1", "x2"], [2020, 2021, 2030, 2035]), dims=("x", "t"), ) ) # Function runs result = compound_growth(qty, "t") # Results have expected values r1 = result.sel(x="x1") assert all(1.0 == r1.sel(t=2020)) assert all(1.01 == r1.sel(t=2021) / r1.sel(t=2020)) assert all(1.0 == r1.sel(t=2030) / r1.sel(t=2021)) assert all(1.02**5 == r1.sel(t=2035) / r1.sel(t=2030)) assert all(1.0 == result.sel(x="x2"))
[docs] def test_filter_ts(): df = pd.DataFrame([["foo"], ["bar"]], columns=["variable"]) assert 2 == len(df) # Operator runs result = filter_ts(df, re.compile(".(ar)")) # Only matching rows are returned assert 1 == len(result) # Only the first match group in `expr` is preserved assert {"ar"} == set(result.variable.unique())
[docs] def test_from_url(scenario): full_url = f"ixmp://{scenario.platform.name}/{scenario.url}" # Operator runs result = from_url(full_url) # Result is of the default class assert result.__class__ is ixmp.TimeSeries # Same object was retrieved assert scenario.url == result.url # Same, but specifying message_ix.Scenario result = from_url(full_url, message_ix.Scenario) assert result.__class__ is message_ix.Scenario assert scenario.url == result.url
[docs] def test_get_remove_ts(caplog, scenario): # get_ts() runs result0 = get_ts(scenario) pdt.assert_frame_equal(scenario.timeseries(), result0) # Can be used through a Computer c = Computer() c.require_compat("message_ix_models.report.operator") c.add("scenario", scenario) key = c.add("test1", "get_ts", "scenario", filters=dict(variable="GDP")) result1 = c.get(key) assert 3 == len(result1) # remove_ts() can be used through Computer key = c.add("test2", "remove_ts", "scenario", "config", after=1964) # Task runs, logs # NB this log message is incorrect, because ixmp's JDBCBackend is unable to delete # data stored with "meta=True". Only 1 row is removed with assert_logs(caplog, "Remove 2 of 6 (1964 <= year) rows of time series data"): c.get(key) # See comment above; only one row is removed assert 6 - 1 == len(scenario.timeseries()) # remove_ts() can be used directly remove_ts(scenario) # All non-'meta' data were removed assert 3 == len(scenario.timeseries())
[docs] def test_gwp_factors(): result = gwp_factors() assert ("gwp metric", "e", "e equivalent") == result.dims
[docs] @pytest.fixture(scope="module") def context_with_reporting_data(session_context: Context) -> Iterator[Context]: context = deepcopy(session_context) # Common time series data data = pd.DataFrame( [["m", "s", "", "DantzigLand", "u", 1234, 1.0]], columns=["Model", "Scenario", "Variable", "Region", "Unit", "Year", "Value"], ) # Data for latest_reporting_from_file() base_dir = context.get_local_path("report") for v in (3, 4): path = base_dir.joinpath(f"test_latest_reporting_s_v{v}", "all.csv") path.parent.mkdir(parents=True, exist_ok=True) data.assign(Variable=f"Variable|File {v}", value=float(v)).to_csv( path, index=False ) # A dummy/empty directory base_dir.joinpath("test_latest_reporting_s_v99").mkdir(exist_ok=True) # Data for latest_reporting_from_platform() mp = context.get_platform() s = make_dantzig(mp, solve=True) for v in (1, 2): s_clone = s.clone(model="test_latest_reporting", scenario="s") assert v == s_clone.version s_clone.check_out(timeseries_only=True) s_clone.add_timeseries( data.assign(Variable=f"Variable|Platform {v}", value=float(v)) ) s_clone.commit("") # A clone, but with no solution s_clone = s.clone(model="test_latest_reporting", scenario="s", keep_solution=False) try: yield context finally: context.delete()
[docs] @pytest.mark.parametrize( "use, variable", [ ({"file"}, "Variable|File 4"), ({"platform"}, "Variable|Platform 2"), ({"file", "platform"}, "Variable|File 4"), # Version 4 from file > 2 from mp ], ) def test_latest_reporting( caplog: pytest.LogCaptureFixture, context_with_reporting_data: Context, use: set[str], variable: str, ) -> None: ctx = context_with_reporting_data # With missing scenario identifiers info = ScenarioInfo() # Function runs result = latest_reporting(ctx, info, use=use) # Returns None assert None is result # Message is logged about unavailable data assert "None/None → NO DATA" == caplog.messages[0] # With existing data info = ScenarioInfo(model="test_latest_reporting", scenario="s") # Operator returns a data frame result = latest_reporting(ctx, info, use=use) assert result is not None # Data frame is from the expected source and version assert variable in result.variable.unique()
[docs] def test_make_output_path(tmp_path, c): # Configure a Computer, ensuring the output_dir configuration attribute is set c.configure(output_dir=tmp_path) # Add a computation that invokes make_output_path c.add("test", make_output_path, "config", "foo.csv") # Returns the correct path assert tmp_path.joinpath("foo.csv") == c.get("test")
[docs] def test_model_periods(): # Prepare input data si = ScenarioInfo() si.year_from_codes(get_codes("year/B")) cat_year = pd.DataFrame(si.set["cat_year"], columns=["type_year", "year"]) # Operator runs result = model_periods(si.set["year"], cat_year) assert isinstance(result, list) assert all(isinstance(y, int) for y in result) assert 2020 == min(result)
[docs] @pytest.mark.xfail(reason="Incomplete") def test_share_curtailment(): share_curtailment()
_p = Path("/example/base/dir") # Arguments to summarize(); example of the "transport all" report key as of 2025-11-21 SUMMARIZE_ARGS = [ [ _p / "base-fe-intensity-gdp.pdf", _p / "inv-cost-ldv.pdf", _p / "demand.pdf", _p / "demand" / "share.pdf", None, _p / "energy" / "c.pdf", _p / "energy" / "c-share.pdf", _p / "stock" / "ldv.pdf", _p / "stock" / "non-ldv.pdf", ], [[None, None], None], _p / "sdmx", [None, None, None, None, None, None, None, None, None, None, None, None], ]
[docs] @pytest.mark.parametrize( "args", ( (SUMMARIZE_ARGS,), # Single list arg to summarize() containing further items SUMMARIZE_ARGS, # Multiple positional args to summarize() ), ) def test_summarize(args) -> None: # Operator runs result = summarize(*args) # Results as expected assert re.fullmatch( rf""" 9 file or directory paths: 4 in ({re.escape(os.sep)})example\1base\1dir\1 1 in \1example\1base\1dir\1demand\1 2 in \1example\1base\1dir\1energy\1 2 in \1example\1base\1dir\1stock\1 16 × None 0 other items""", result, )