import logging
import os
import shutil
from base64 import b32hexencode
from copy import deepcopy
from pathlib import Path
from random import randbytes
from tempfile import TemporaryDirectory
from typing import Generator
import message_ix
import pandas as pd
import pytest
from ixmp import config as ixmp_config
from message_ix_models import util
from message_ix_models.model import snapshot
from message_ix_models.util._logging import mark_time
from message_ix_models.util.context import Context
log = logging.getLogger(__name__)
# pytest hooks
[docs]def pytest_addoption(parser):
"""Add two command-line options to pytest:
``--local-cache``
Use existing, local cache files in tests. This option can speed up tests that
*use* the results of slow data loading/parsing. However, if cached values are not
up to date with the current code, unexpected failure may occur.
``--jvmargs``
Additional arguments to give for the Java Virtual Machine used by :mod:`ixmp`'s
:class:`.JDBCBackend`. Used by :func:`session_context`.
"""
parser.addoption(
"--local-cache",
action="store_true",
help="Use existing local cache files in tests",
)
parser.addoption(
"--jvmargs",
action="store",
default="",
help="Arguments for Java VM used by ixmp JDBCBackend",
)
# Fixtures
[docs]@pytest.fixture(scope="function")
def preserve_report_callbacks():
"""Protect :data:`.report.CALLBACKS` from effects of a test function.
Use this fixture for test functions that call :func:`.report.register` to avoid
changing the global/default configuration for other tests.
"""
from message_ix_models import report
try:
tmp = report.CALLBACKS.copy()
yield
finally:
report.CALLBACKS.clear()
report.CALLBACKS.extend(tmp)
[docs]@pytest.fixture(scope="session")
def session_context(pytestconfig, tmp_env):
"""A :class:`.Context` connected to a temporary, in-memory database.
This Context is suitable for modifying and running test code that does not affect
the user/developer's filesystem and configured :mod:`ixmp` databases.
Uses the :func:`.tmp_env` fixture from ixmp. This fixture also sets:
- :attr:`.Config.cache_path`, depending on whether the :program:`--local-cache` CLI
option was given:
- If not given: pytest's :doc:`standard cache directory <pytest:how-to/cache>`.
- If given: the :file:`/cache/` directory under the user's "message local data"
directory.
- the "message local data" config key to a temporary directory :file:`/data/` under
the :ref:`pytest tmp_path directory <pytest:tmp_path>`.
"""
from platformdirs import user_cache_path
ctx = Context.only()
# Temporary, empty local directory for local data
session_tmp_dir = Path(pytestconfig._tmp_path_factory.mktemp("data"))
# Set the cache path according to whether pytest --local-cache was given. If True,
# pick up the existing setting from the user environment. If False, use a pytest-
# managed cache directory that persists across test sessions.
ctx.cache_path = (
user_cache_path("message-ix-models", ensure_exists=True)
if pytestconfig.option.local_cache
# TODO use pytestconfig.cache.mkdir() when pytest >= 6.3 is available
else Path(pytestconfig.cache.makedir("cache"))
)
# Other local data in the temporary directory for this session only
ctx.local_data = session_tmp_dir
# Also set the "message local data" key in the ixmp config
ixmp_config.set("message local data", session_tmp_dir)
# If message_data is not installed, use a temporary path for private_data_path()
message_data_path = util.MESSAGE_DATA_PATH
if util.MESSAGE_DATA_PATH is None:
util.MESSAGE_DATA_PATH = session_tmp_dir.joinpath("message_data")
# Create some subdirectories
util.MESSAGE_DATA_PATH.joinpath("data", "tests").mkdir(parents=True)
# Add a platform connected to an in-memory database
platform_name = "message-ix-models"
ixmp_config.add_platform(
platform_name,
"jdbc",
"hsqldb",
url=f"jdbc:hsqldb:mem://{platform_name}",
jvmargs=pytestconfig.option.jvmargs,
)
ixmp_config.save()
ctx.platform_info["name"] = platform_name
try:
yield ctx
finally:
ctx.close_db()
ixmp_config.remove_platform(platform_name)
# Restore prior value
util.MESSAGE_DATA_PATH = message_data_path
[docs]@pytest.fixture(scope="function")
def test_context(request, session_context):
"""A copy of :func:`session_context` scoped to one test function."""
ctx = deepcopy(session_context)
# Ensure there is a report key
ctx.setdefault("report", dict())
yield ctx
ctx.delete()
[docs]@pytest.fixture(scope="function")
def user_context(request): # pragma: no cover
"""Context which can access user's configuration, e.g. platform names."""
# Disabled; this is bad practice
raise NotImplementedError
[docs]@pytest.fixture
def mix_models_cli(session_context, tmp_env):
"""A :class:`.CliRunner` object that invokes the :program:`mix-models` CLI.
NB this requires:
- The :mod:`ixmp` :func:`.tmp_env` fixture. This sets ``IXMP_DATA`` to a temporary
directory managed by :mod:`pytest`.
- The :func:`session_context` fixture. This (a) sets :attr:`.Config.local_data` to
a temporary directory within ``IXMP_DATA`` and (b) ensures changes to
:class:`.Context` made by invoked commands do not reach other tests.
"""
from message_ix_models import cli
from message_ix_models.util.click import CliRunner
yield CliRunner(cli.main, cli.__name__, env=tmp_env)
# Testing utility functions
[docs]def bare_res(request, context: Context, solved: bool = False) -> message_ix.Scenario:
"""Return or create a :class:`.Scenario` containing the bare RES for use in testing.
The Scenario has a model name like "MESSAGEix-GLOBIOM [regions] Y[years]", for
instance "MESSAGEix-GLOBIOM R14 YB" (see :func:`.bare.name`) and a scenario name
either from :py:`request.node.name` or "baseline" plus a random string.
This function should:
- only be called from within test code, i.e. in :mod:`message_data.tests`.
- be called once for each test function, so that each test receives a fresh copy of
the RES scenario.
Parameters
----------
request : .FixtureRequest or None
The pytest :fixture:`pytest:request` fixture. If provided the pytest test node
name is used for the scenario name of the returned Scenario.
context : .Context
Passed to :func:`.testing.bare_res`.
solved : bool, optional
Return a solved Scenario.
Returns
-------
Scenario
The scenario is a fresh clone, so can be modified freely without disturbing
other tests.
"""
from message_ix_models.model import bare
name = bare.name(context)
mp = context.get_platform()
try:
base = message_ix.Scenario(mp, name, "baseline")
except ValueError:
log.info(f"Create '{name}/baseline' for testing")
context.scenario_info.update(model=name, scenario="baseline")
base = bare.create_res(context)
if solved and not base.has_solution():
log.info("Solve")
base.solve(solve_options=dict(lpmethod=4), quiet=True)
try:
new_name = request.node.name
except AttributeError:
# Generate a new scenario name with a random part
new_name = f"baseline {b32hexencode(randbytes(3)).decode().rstrip('=').lower()}"
log.info(f"Clone to '{name}/{new_name}'")
return base.clone(scenario=new_name, keep_solution=solved)
#: Items with names that match (partially or fully) these names are omitted by
#: :func:`export_test_data`.
EXPORT_OMIT = [
"aeei",
"cost_MESSAGE",
"demand_MESSAGE",
"demand",
"depr",
"esub",
"gdp_calibrate",
"grow",
"historical_gdp",
"kgdp",
"kpvs",
"lakl",
"land",
"lotol",
"mapping_macro_sector",
"MERtoPPP",
"prfconst",
"price_MESSAGE",
"ref_",
"sector",
]
[docs]def export_test_data(context: Context):
"""Export a subset of data from a scenario, for use in tests.
The context settings ``export_nodes`` (default: "R11_AFR" and "R11_CPA") and
``export_techs`` (default: "coal_ppl") are used to filter the data exported.
In addition, any item (set, parameter, variable, or equation) with a name matching
:data:`EXPORT_OMIT` *or* the context setting ``export_exclude`` is discarded.
The output is stored at :file:`data/tests/{model name}_{scenario name}_{techs}.xlsx`
in :mod:`message_data`.
See also
--------
:ref:`export-test-data`
"""
from message_ix_models.util import private_data_path
# Load the scenario to be exported
scen = context.get_scenario()
# Retrieve the context settings giving the nodes and technologies to export
nodes = context.get("export_nodes", ["R11_AFR", "R11_CPA"])
technology = context.get("export_techs", ["coal_ppl"])
# Construct the destination file name
dest_file = private_data_path(
"tests", f"{scen.model}_{scen.scenario}_{'_'.join(technology)}.xlsx"
)
# Temporary file name
td = TemporaryDirectory()
tmp_file = Path(td.name).joinpath("export_test_data.xlsx")
# Ensure the target directory exists
dest_file.parent.mkdir(exist_ok=True)
# Dump data to temporary Excel file
log.info(f"Export test data to {dest_file}")
scen.to_excel(
tmp_file,
filters={
"technology": technology,
"node": nodes,
"node_dest": nodes,
"node_loc": nodes,
"node_origin": nodes,
"node_parent": nodes,
"node_rel": nodes,
"node_share": nodes,
},
)
mark_time()
log.info("Reduce test data")
# Read from temporary file and write to final file, omitting unnecessary sheets
reader = pd.ExcelFile(tmp_file)
writer = pd.ExcelWriter(dest_file)
# Retrieve the type mapping first, to be modified as sheets are discarded
ix_type_mapping = reader.parse("ix_type_mapping").set_index("item")
for name in reader.sheet_names:
# Check if this sheet is to be included
if name == "ix_type_mapping":
# Already handled
continue
elif any(i in name for i in (EXPORT_OMIT + context.get("export_exclude", []))):
log.info(f"Discard sheet '{name}'")
# Remove from the mapping
ix_type_mapping.drop(name, inplace=True)
continue
# Copy the sheet from temporary to final file
reader.parse(name).to_excel(writer, sheet_name=name, index=False)
# Close the temporary file
reader.close()
# Write the mapping
ix_type_mapping.reset_index().to_excel(
writer, sheet_name="ix_type_mapping", index=False
)
# Close the final file
writer.close()
mark_time()
#: Shorthand for marking a parametrized test case that is expected to fail because it is
#: not implemented.
NIE = pytest.mark.xfail(raises=NotImplementedError)
#: :data:`True` if tests occur on GitHub Actions.
GHA = "GITHUB_ACTIONS" in os.environ
[docs]def not_ci(reason=None, action="skip"):
"""Mark a test as xfail or skipif if on CI infrastructure.
Checks the ``GITHUB_ACTIONS`` environment variable; returns a pytest mark.
"""
action = "skipif" if action == "skip" else action
return getattr(pytest.mark, action)(condition=GHA, reason=reason)
[docs]def unpack_snapshot_data(context: Context, snapshot_id: int):
"""Already-unpacked data for a snapshot.
This copies the .csv.gz files from message_ix_models/data/test/… to the directory
where they *would* be unpacked by .model.snapshot._unpack. This causes the code to
skip unpacking them, which can be very slow.
"""
if snapshot_id not in (0, 1):
log.info(f"No unpacked data for snapshot {snapshot_id}")
return
parts = (f"snapshot-{snapshot_id}", "MESSAGEix-GLOBIOM_1.1_R11_no-policy_baseline")
dest = context.get_cache_path(*parts)
log.debug(f"{dest = }")
snapshot_data_path = util.package_data_path("test", *parts)
log.debug(f"{snapshot_data_path = }")
shutil.copytree(snapshot_data_path, dest, dirs_exist_ok=True)
@pytest.fixture(
scope="session",
params=[
int(k.split("-")[1]) for k in util.pooch.SOURCE if k.startswith("snapshot")
],
)
def loaded_snapshot(
request, session_context, solved: bool = False
) -> Generator[message_ix.Scenario, None, None]:
snapshot_id: int = request.param
assert snapshot_id is not None
unpack_snapshot_data(context=session_context, snapshot_id=snapshot_id)
model_name = "MESSAGEix-GLOBIOM_1.1_R11_no-policy"
scenario_name = f"baseline_v{snapshot_id}"
mp = session_context.get_platform()
# The following code roughly parallels bare_res()
try:
base = message_ix.Scenario(mp, model=model_name, scenario=scenario_name)
except ValueError:
log.info(f"Create '{model_name}/{scenario_name}' for testing")
session_context.scenario_info.update(model=model_name, scenario=scenario_name)
base = message_ix.Scenario(
mp, model=model_name, scenario=scenario_name, version="new"
)
snapshot.load(
scenario=base,
snapshot_id=snapshot_id,
extra_cache_path=f"snapshot-{snapshot_id}",
)
if solved and not base.has_solution():
log.info("Solve")
base.solve(solve_options=dict(lpmethod=4), quiet=True)
try:
new_name = request.node.name
except AttributeError:
# Generate a new scenario name with a random part
new_name = f"baseline {b32hexencode(randbytes(3)).decode().rstrip('=').lower()}"
log.info(f"Clone to '{model_name}/{new_name}'")
yield base.clone(scenario=new_name, keep_solution=solved)