Source code for message_ix_models.project.scenariomip.workflow

"""Generate the ScenarioMIP workflow."""

import logging
from typing import TYPE_CHECKING, Optional

from message_ix_models.workflow import Workflow

if TYPE_CHECKING:
    from message_ix import Scenario

    from message_ix_models import Context

log = logging.getLogger(__name__)


[docs] def step_01(context: "Context", scenario: "Scenario") -> "Scenario": """Clean Timeseries.""" log.info("Not implemented: step_01(…)") return scenario
[docs] def step_02(context: "Context", scenario: "Scenario") -> "Scenario": """Add Land-use Emulator.""" log.info("Not implemented: step_02(…)") return scenario
[docs] def step_03(context: "Context", scenario: "Scenario") -> "Scenario": """Add Biomass Trade.""" log.info("Not implemented: step_03(…)") return scenario
[docs] def step_04(context: "Context", scenario: "Scenario") -> "Scenario": """Solve for Historic Reporting.""" log.info("Not implemented: step_04(…)") return scenario
[docs] def step_05(context: "Context", scenario: "Scenario") -> "Scenario": """Build Materials.""" log.info("Not implemented: step_05(…)") return scenario
[docs] def step_06(context: "Context", scenario: "Scenario") -> "Scenario": """Add DAC (Direct Air Capture).""" log.info("Not implemented: step_06(…)") return scenario
[docs] def step_07(context: "Context", scenario: "Scenario") -> "Scenario": """Add Non-CO2 GHGs.""" log.info("Not implemented: step_07(…)") return scenario
[docs] def step_08(context: "Context", scenario: "Scenario") -> "Scenario": """Add Water.""" log.info("Not implemented: step_08(…)") return scenario
[docs] def step_09(context: "Context", scenario: "Scenario") -> "Scenario": """Add Techno-economic Parameters.""" log.info("Not implemented: step_09(…)") return scenario
[docs] def step_10(context: "Context", scenario: "Scenario") -> "Scenario": """Add Balance Equalities (some petrochemical trade balances?).""" log.info("Not implemented: step_10(…)") return scenario
[docs] def step_11(context: "Context", scenario: "Scenario") -> "Scenario": """OBSOLETE.""" log.info("Not implemented: step_11(…)") return scenario
[docs] def step_12(context: "Context", scenario: "Scenario") -> "Scenario": """Gas Mix Limitations (constrain gas use in industries).""" log.info("Not implemented: step_12(…)") return scenario
[docs] def step_13(context: "Context", scenario: "Scenario") -> "Scenario": """Add Slack and Constraints (so we need to update the slacks).""" log.info("Not implemented: step_13(…)") return scenario
[docs] def step_14(context: "Context", scenario: "Scenario") -> "Scenario": """Add Shipping.""" log.info("Not implemented: step_14(…)") return scenario
[docs] def step_15(context: "Context", scenario: "Scenario") -> "Scenario": log.info("Not implemented: step_15(…)") return scenario
[docs] def step_16(context: "Context", scenario: "Scenario") -> "Scenario": """Calibrate Macro.""" log.info("Not implemented: step_16(…)") return scenario
#: Order of steps in the workflow. STEP_ORDER = [ step_01, step_02, step_03, step_04, step_05, step_06, step_07, step_08, step_09, step_10, step_11, step_12, step_13, step_14, step_15, step_16, ]
[docs] def generate(context: "Context", step_order: Optional[list] = None) -> "Workflow": """Generate the ScenarioMIP workflow. Parameters ---------- step_order Order of steps in the workflow. If not given, :data:`STEP_ORDER` is used. """ wf = Workflow(context) prev = wf.add_step("base", None) step_order = step_order or STEP_ORDER for i, func in enumerate(step_order, start=1): prev = wf.add_step(f"step_{i}", prev, func) # "all" is an alias for the last step, whatever it is wf.add("all", prev) return wf