Source code for message_ix_models.tests.model.water.test_report

import os.path
from typing import Any

import numpy as np
import pandas as pd
import pytest
from message_ix import Scenario

from message_ix_models import ScenarioInfo
from message_ix_models.model.structure import get_codes
from message_ix_models.model.water.report import (
    ScenarioMetadata,
    aggregate_totals,
    get_population_values,
    process_rates,
    report_full,
)
from message_ix_models.util import package_data_path


# NB: this tests all functions in model/water/reporting
[docs] @pytest.mark.xfail(reason="Temporary, for #106") def test_report_full(test_context: Any, request: pytest.FixtureRequest) -> None: # FIXME You probably want this to be part of a common setup rather than writing # something like this for every test test_context.time = "year" test_context.type_reg = "global" test_context.regions = "R12" codes = get_codes(f"node/{test_context.regions}") world_code = [n for n in codes if str(n) == "World"][0] nodes = [str(n) for n in world_code.child] # test_context.map_ISO_c = {test_context.regions: nodes[0]} mp = test_context.get_platform() scenario_info = { "mp": mp, "model": f"{request.node.name}/test water model", "scenario": f"{request.node.name}/test water scenario", "version": "new", } s = Scenario(**scenario_info) s.add_horizon(year=[2020, 2030, 2040]) s.add_set("technology", ["tech1", "tech2"]) s.add_set("year", [2020, 2030, 2040]) s.add_set("node", nodes) s.commit(comment="basic water report_full test model") s.set_as_default() # Remove quiet=True to debug using the output s.solve(quiet=True) test_context.set_scenario(s) # FIXME same as above test_context["water build info"] = ScenarioInfo(s) # Run the function to be tested report_full(sc=s, reg=test_context.regions, ssp="SSP2") # Since the function doesn't return anything, check that output file is produced in # correct location result_file = ( package_data_path().parents[0] / f"reporting_output/{s.model}_{s.scenario}.csv" ) assert os.path.isfile(result_file)
[docs] @pytest.mark.parametrize( "population_type,expected_connection_var,expected_access_var", [ ( "urban", "Connection Rate|Drinking Water|Urban", "Population|Drinking Water Access|Urban", ), ( "rural", "Connection Rate|Drinking Water|Rural", "Population|Drinking Water Access|Rural", ), ], ) def test_process_rates( population_type: str, expected_connection_var: str, expected_access_var: str ) -> None: """Test process_rates function handles urban/rural rate processing correctly.""" # Create mock rates data rates_data = pd.DataFrame( [ { "variable": f"{population_type}_water_connection_rate", "value": 0.8, }, { "variable": f"{population_type}_water_treatment_rate", "value": 0.6, }, ] ) population_value = 1000.0 region = "R12_AFR" year = 2030 metadata: ScenarioMetadata = { "model": "test_model", "scenario": "test_scenario", "unit": "million", } result = process_rates( population_type, population_value, rates_data, region, year, metadata ) # Should return 5 entries: population + 2 rates + 2 access calculations assert len(result) == 5 # Check population entry population_var = f"Population|{population_type.capitalize()}" pop_entry = next(r for r in result if r["variable"] == population_var) assert pop_entry["value"] == population_value assert pop_entry["region"] == region assert pop_entry["year"] == year # Check connection rate entry conn_rate_entry = next( r for r in result if r["variable"] == expected_connection_var ) assert conn_rate_entry["value"] == 0.8 # Check drinking water access calculation access_entry = next(r for r in result if r["variable"] == expected_access_var) assert access_entry["value"] == 800.0 # 1000 * 0.8
[docs] def test_get_population_values() -> None: """Test get_population_values extracts urban/rural population correctly.""" # Create mock population data pop_data = pd.DataFrame( [ { "region": "R12_AFR", "year": 2030, "variable": "Population|Urban", "value": 500.0, }, { "region": "R12_AFR", "year": 2030, "variable": "Population|Rural", "value": 300.0, }, { "region": "R12_CHN", "year": 2030, "variable": "Population|Urban", "value": 800.0, }, ] ) # Test successful extraction urban_val, rural_val = get_population_values(pop_data, "R12_AFR", 2030) assert urban_val == 500.0 assert rural_val == 300.0 # Test missing rural data urban_val, rural_val = get_population_values(pop_data, "R12_CHN", 2030) assert urban_val == 800.0 assert np.isnan(rural_val) # Test missing region/year combination urban_val, rural_val = get_population_values(pop_data, "R12_IND", 2040) assert np.isnan(urban_val) assert np.isnan(rural_val)
[docs] def test_aggregate_totals() -> None: """Test aggregate_totals creates correct regional aggregations.""" # Create mock result data result_df = pd.DataFrame( [ { "region": "R12_AFR", "year": 2030, "variable": "Population|Drinking Water Access|Urban", "value": 400.0, "model": "test_model", "scenario": "test_scenario", "unit": "million", }, { "region": "R12_AFR", "year": 2030, "variable": "Population|Drinking Water Access|Rural", "value": 240.0, "model": "test_model", "scenario": "test_scenario", "unit": "million", }, { "region": "R12_AFR", "year": 2030, "variable": "Population|Urban", "value": 500.0, "model": "test_model", "scenario": "test_scenario", "unit": "million", }, { "region": "R12_AFR", "year": 2030, "variable": "Population|Rural", "value": 300.0, "model": "test_model", "scenario": "test_scenario", "unit": "million", }, ] ) totals = aggregate_totals(result_df) # Should return 2 aggregated DataFrames (drinking water access + population totals) assert len(totals) == 2 # Check drinking water access total drink_total = next( df for df in totals if "Drinking Water Access" in df["variable"].iloc[0] ) assert len(drink_total) == 1 assert drink_total["variable"].iloc[0] == "Population|Drinking Water Access" assert drink_total["value"].iloc[0] == 640.0 # 400 + 240 # Check population total pop_total = next(df for df in totals if df["variable"].iloc[0] == "Population") assert len(pop_total) == 1 assert pop_total["value"].iloc[0] == 800.0 # 500 + 300