import logging
import os
import shutil
try:
from base64 import b32hexencode as b32encode
except ImportError:
from base64 import b32encode
from collections.abc import Generator
from copy import deepcopy
from pathlib import Path
from random import randbytes
from tempfile import TemporaryDirectory
import message_ix
import pandas as pd
import pytest
from ixmp import config as ixmp_config
from message_ix_models import util
from message_ix_models.model import snapshot
from message_ix_models.util._logging import mark_time
from message_ix_models.util.context import Context
log = logging.getLogger(__name__)
# pytest hooks
[docs]
def pytest_addoption(parser):
"""Add two command-line options to pytest:
``--local-cache``
Use existing, local cache files in tests. This option can speed up tests that
*use* the results of slow data loading/parsing. However, if cached values are not
up to date with the current code, unexpected failure may occur.
``--jvmargs``
Additional arguments to give for the Java Virtual Machine used by :mod:`ixmp`'s
:class:`.JDBCBackend`. Used by :func:`session_context`.
"""
parser.addoption(
"--local-cache",
action="store_true",
help="Use existing local cache files in tests",
)
parser.addoption(
"--jvmargs",
action="store",
default="",
help="Arguments for Java VM used by ixmp JDBCBackend",
)
# Fixtures
[docs]
@pytest.fixture(scope="session")
def session_context(pytestconfig, tmp_env):
"""A :class:`.Context` connected to a temporary, in-memory database.
This Context is suitable for modifying and running test code that does not affect
the user/developer's filesystem and configured :mod:`ixmp` databases.
Uses the :func:`.tmp_env` fixture from ixmp. This fixture also sets:
- :attr:`.Config.cache_path`, depending on whether the :program:`--local-cache` CLI
option was given:
- If not given: pytest's :doc:`standard cache directory <pytest:how-to/cache>`.
- If given: the :file:`/cache/` directory under the user's "message local data"
directory.
- the "message local data" config key to a temporary directory :file:`/data/` under
the :ref:`pytest tmp_path directory <pytest:tmp_path>`.
"""
from platformdirs import user_cache_path
ctx = Context.only()
# Temporary, empty local directory for local data
session_tmp_dir = Path(pytestconfig._tmp_path_factory.mktemp("data"))
# Set the cache path according to whether pytest --local-cache was given. If True,
# pick up the existing setting from the user environment. If False, use a pytest-
# managed cache directory that persists across test sessions.
ctx.cache_path = (
user_cache_path("message-ix-models", ensure_exists=True)
if pytestconfig.option.local_cache
# TODO use pytestconfig.cache.mkdir() when pytest >= 6.3 is available
else Path(pytestconfig.cache.makedir("cache"))
)
# Store current .util.config.Config.local_data setting from the user's configuration
pytestconfig.user_local_data = ctx.core.local_data
# Other local data in the temporary directory for this session only
ctx.core.local_data = session_tmp_dir
# Also set the "message local data" key in the ixmp config
ixmp_config.set("message local data", session_tmp_dir)
# If message_data is not installed, use a temporary path for private_data_path()
message_data_path = util.MESSAGE_DATA_PATH
if util.MESSAGE_DATA_PATH is None:
util.MESSAGE_DATA_PATH = session_tmp_dir.joinpath("message_data")
# Create some subdirectories
util.MESSAGE_DATA_PATH.joinpath("data", "tests").mkdir(parents=True)
# Add a platform connected to an in-memory database
platform_name = "message-ix-models"
ixmp_config.add_platform(
platform_name,
"jdbc",
"hsqldb",
url=f"jdbc:hsqldb:mem://{platform_name}",
jvmargs=pytestconfig.option.jvmargs,
)
ixmp_config.save()
ctx.platform_info["name"] = platform_name
try:
yield ctx
finally:
ctx.close_db()
ixmp_config.remove_platform(platform_name)
# Restore prior value
util.MESSAGE_DATA_PATH = message_data_path
[docs]
@pytest.fixture(scope="function")
def test_context(request, session_context):
"""A copy of :func:`session_context` scoped to one test function."""
try:
ctx = deepcopy(session_context)
except Exception:
print(repr(session_context._values))
raise
yield ctx
ctx.delete()
[docs]
@pytest.fixture(scope="function")
def user_context(request): # pragma: no cover
"""Context which can access user's configuration, e.g. platform names."""
# Disabled; this is bad practice
raise NotImplementedError
[docs]
@pytest.fixture
def mix_models_cli(session_context, tmp_env):
"""A :class:`.CliRunner` object that invokes the :program:`mix-models` CLI.
NB this requires:
- The :mod:`ixmp` :func:`.tmp_env` fixture. This sets ``IXMP_DATA`` to a temporary
directory managed by :mod:`pytest`.
- The :func:`session_context` fixture. This (a) sets :attr:`.Config.local_data` to
a temporary directory within ``IXMP_DATA`` and (b) ensures changes to
:class:`.Context` made by invoked commands do not reach other tests.
"""
from message_ix_models import cli
from message_ix_models.util.click import CliRunner
yield CliRunner(cli.main, cli.__name__, env=tmp_env)
# Testing utility functions
[docs]
def bare_res(request, context: Context, solved: bool = False) -> message_ix.Scenario:
"""Return or create a :class:`.Scenario` containing the bare RES for use in testing.
The Scenario has a model name like "MESSAGEix-GLOBIOM [regions] Y[years]", for
instance "MESSAGEix-GLOBIOM R14 YB" (see :func:`.bare.name`) and a scenario name
either from :py:`request.node.name` or "baseline" plus a random string.
This function should:
- only be called from within test code, i.e. in :mod:`message_data.tests`.
- be called once for each test function, so that each test receives a fresh copy of
the RES scenario.
Parameters
----------
request : .FixtureRequest or None
The pytest :fixture:`pytest:request` fixture. If provided the pytest test node
name is used for the scenario name of the returned Scenario.
context : .Context
Passed to :func:`.testing.bare_res`.
solved : bool, optional
Return a solved Scenario.
Returns
-------
Scenario
The scenario is a fresh clone, so can be modified freely without disturbing
other tests.
"""
from message_ix_models.model import bare
# Model name: standard "MESSAGEix-GLOBIOM R12 YB" plus a suffix
log.info(f"bare_res: {context.model.regions = }")
model_name = bare.name(context, unique=True)
mp = context.get_platform()
try:
base = message_ix.Scenario(mp, model_name, "baseline")
except ValueError:
log.info(f"Create '{model_name}/baseline' for testing")
context.scenario_info.update(model=model_name, scenario="baseline")
base = bare.create_res(context)
log.info(f"base.set('node') = {' '.join(sorted(base.set('node')))}")
if solved and not base.has_solution():
log.info("Solve")
base.solve(solve_options=dict(lpmethod=4), quiet=True)
try:
new_name = request.node.name
except AttributeError:
# Generate a new scenario name with a random part
new_name = f"baseline {b32encode(randbytes(3)).decode().rstrip('=').lower()}"
log.info(f"Clone to '{model_name}/{new_name}'")
return base.clone(scenario=new_name, keep_solution=solved)
#: Items with names that match (partially or fully) these names are omitted by
#: :func:`export_test_data`.
EXPORT_OMIT = [
"aeei",
"cost_MESSAGE",
"demand_MESSAGE",
"demand",
"depr",
"esub",
"gdp_calibrate",
"grow",
"historical_gdp",
"kgdp",
"kpvs",
"lakl",
"land",
"lotol",
"mapping_macro_sector",
"MERtoPPP",
"prfconst",
"price_MESSAGE",
"ref_",
"sector",
]
[docs]
def export_test_data(context: Context):
"""Export a subset of data from a scenario, for use in tests.
The context settings ``export_nodes`` (default: "R11_AFR" and "R11_CPA") and
``export_techs`` (default: "coal_ppl") are used to filter the data exported.
In addition, any item (set, parameter, variable, or equation) with a name matching
:data:`EXPORT_OMIT` *or* the context setting ``export_exclude`` is discarded.
The output is stored at :file:`data/tests/{model name}_{scenario name}_{techs}.xlsx`
in :mod:`message_data`.
See also
--------
:ref:`export-test-data`
"""
from message_ix_models.util import private_data_path
# Load the scenario to be exported
scen = context.get_scenario()
# Retrieve the context settings giving the nodes and technologies to export
nodes = context.get("export_nodes", ["R11_AFR", "R11_CPA"])
technology = context.get("export_techs", ["coal_ppl"])
# Construct the destination file name
dest_file = private_data_path(
"tests", f"{scen.model}_{scen.scenario}_{'_'.join(technology)}.xlsx"
)
# Temporary file name
td = TemporaryDirectory()
tmp_file = Path(td.name).joinpath("export_test_data.xlsx")
# Ensure the target directory exists
dest_file.parent.mkdir(exist_ok=True)
# Dump data to temporary Excel file
log.info(f"Export test data to {dest_file}")
scen.to_excel(
tmp_file,
filters={
"technology": technology,
"node": nodes,
"node_dest": nodes,
"node_loc": nodes,
"node_origin": nodes,
"node_parent": nodes,
"node_rel": nodes,
"node_share": nodes,
},
)
mark_time()
log.info("Reduce test data")
# Read from temporary file and write to final file, omitting unnecessary sheets
reader = pd.ExcelFile(tmp_file)
writer = pd.ExcelWriter(dest_file)
# Retrieve the type mapping first, to be modified as sheets are discarded
ix_type_mapping = reader.parse("ix_type_mapping").set_index("item")
for name in map(str, reader.sheet_names):
# Check if this sheet is to be included
if name == "ix_type_mapping":
# Already handled
continue
elif any(i in name for i in (EXPORT_OMIT + context.get("export_exclude", []))):
log.info(f"Discard sheet '{name}'")
# Remove from the mapping
ix_type_mapping.drop(name, inplace=True)
continue
# Copy the sheet from temporary to final file
reader.parse(name).to_excel(writer, sheet_name=name, index=False)
# Close the temporary file
reader.close()
# Write the mapping
ix_type_mapping.reset_index().to_excel(
writer, sheet_name="ix_type_mapping", index=False
)
# Close the final file
writer.close()
mark_time()
#: Shorthand for marking a parametrized test case that is expected to fail because it is
#: not implemented.
NIE = pytest.mark.xfail(raises=NotImplementedError)
#: :data:`True` if tests occur on GitHub Actions.
GHA = "GITHUB_ACTIONS" in os.environ
[docs]
def not_ci(reason=None, action="skip"):
"""Mark a test as xfail or skipif if on CI infrastructure.
Checks the ``GITHUB_ACTIONS`` environment variable; returns a pytest mark.
"""
action = "skipif" if action == "skip" else action
return getattr(pytest.mark, action)(condition=GHA, reason=reason)
[docs]
def unpack_snapshot_data(context: Context, snapshot_id: int):
"""Already-unpacked data for a snapshot.
This copies the .csv.gz files from message_ix_models/data/test/… to the directory
where they *would* be unpacked by .model.snapshot._unpack. This causes the code to
skip unpacking them, which can be very slow.
"""
if snapshot_id not in (0, 1):
log.info(f"No unpacked data for snapshot {snapshot_id}")
return
parts = (f"snapshot-{snapshot_id}", "MESSAGEix-GLOBIOM_1.1_R11_no-policy_baseline")
dest = context.get_cache_path(*parts)
log.debug(f"{dest = }")
snapshot_data_path = util.package_data_path("test", *parts)
log.debug(f"{snapshot_data_path = }")
shutil.copytree(snapshot_data_path, dest, dirs_exist_ok=True)
@pytest.fixture(
scope="session",
params=[
int(k.split("-")[1]) for k in util.pooch.SOURCE if k.startswith("snapshot")
],
)
def loaded_snapshot(
request, session_context, solved: bool = False
) -> Generator[message_ix.Scenario, None, None]:
snapshot_id: int = request.param
assert snapshot_id is not None
unpack_snapshot_data(context=session_context, snapshot_id=snapshot_id)
model_name = "MESSAGEix-GLOBIOM_1.1_R11_no-policy"
scenario_name = f"baseline_v{snapshot_id}"
mp = session_context.get_platform()
# The following code roughly parallels bare_res()
try:
base = message_ix.Scenario(mp, model=model_name, scenario=scenario_name)
except ValueError:
log.info(f"Create '{model_name}/{scenario_name}' for testing")
session_context.scenario_info.update(model=model_name, scenario=scenario_name)
base = message_ix.Scenario(
mp, model=model_name, scenario=scenario_name, version="new"
)
snapshot.load(
scenario=base,
snapshot_id=snapshot_id,
extra_cache_path=f"snapshot-{snapshot_id}",
)
if solved and not base.has_solution():
log.info("Solve")
base.solve(solve_options=dict(lpmethod=4), quiet=True)
try:
new_name = request.node.name
except AttributeError:
# Generate a new scenario name with a random part
new_name = f"baseline {b32encode(randbytes(3)).decode().rstrip('=').lower()}"
log.info(f"Clone to '{model_name}/{new_name}'")
yield base.clone(scenario=new_name, keep_solution=solved)