Source code for message_ix_models.tools.messagev

"""Tools for extracting data from MESSAGE V."""

import re
from functools import lru_cache

import numpy as np
import pandas as pd


[docs]class CHNFile: """Reader for MESSAGE V ``.chn`` files.""" index = {} # FIXME reduce complexity from 15 to ≤14 def __init__(self, path): # noqa: C901 """Parse .chn file.""" def _depth(str): return (len(str.replace("\t", " ")) - len(str.lstrip())) // 2 stack = [] self.data = {} for line in open(path): if line.startswith("#"): # Comment pass elif len(stack) == 0: # New level name, level = line.split() stack.append((name, level.rstrip(":"))) elif len(stack) == 1: # New commodity stack.append(tuple(line.split())) elif len(stack) == 2: if _depth(line) == 0 and line.strip() == "*": # End of level stack = [] elif _depth(line) == 1: # New commodity stack[-1] = tuple(line.split()) else: p_c = line.strip().rstrip(":") if p_c in ("Producers", "Consumers"): # Start of P/C block stack.append(p_c) pc_data = [] elif p_c == "*": # Consecutive '*' pass elif len(stack) == 3: if _depth(line) == 2 and line.strip() == "*": # End of block # Store data if len(pc_data): key = tuple([stack[0][0], stack[1][0], stack[2]]) self.data[key] = pc_data stack.pop(-1) else: # Data line # TODO parse: tec, level, code, commodity, {ts,c}, [data] pc_data.append(line.split()) elif line == "*\n": stack.pop(-1)
[docs]class DICFile: """Reader for MESSAGE V ``.dic`` files.""" tec_code = {} code_tec = {} def __init__(self, path=None): if path is None: return for line in open(path): if line.startswith("#"): continue tec, code = line.split() self.tec_code[tec] = code self.code_tec[code] = tec def __getitem__(self, key): try: return self.code_tec[key] except KeyError: return self.tec_code[key]
[docs]class INPFile: """Reader for MESSAGE V ``.inp`` files.""" index = {} file = None years_re = re.compile(r"^timesteps:(( \d*)*)", re.MULTILINE) def __init__(self, path): self.file = open(path) # Index the file section = "_info" pos = self.file.tell() while True: line = self.file.readline() if line == "": break elif line == "*\n": self.index[section] = (pos, self.file.tell() - pos) section = None pos = self.file.tell() elif section is None: section = line.split()[0] def get_section(self, name): start, len = self.index[name] self.file.seek(start) return self.file.read(len)
[docs] @lru_cache(1) def get_years(self): """Return timesteps.""" sec = self.get_section("_info") match = self.years_re.search(sec) return list(map(int, match.groups()[0].strip().split()))
params_with_source = "con1a con1c con2a inp minp moutp" ts_params = "ctime fom inv plf pll vom" + params_with_source scalar_params = { "annualize": int, "display": str, "fyear": int, "lyear": int, "hisc": float, "minp": float, } def const_or_ts(self, line): param = line.pop(0) source = line.pop(0) if param in self.params_with_source else None if param == "minp": line = ["c" if len(line) == 1 else "ts"] + line kind = line.pop(0) if kind == "ts": elem = list(zip(self.get_years(), line)) elif kind == "c": assert len(line) == 1 # # This line implements a fill-forward: # elem = [(year, line[0]) for year in self.get_years()] # Single element elem = [(self.get_years()[0], line[0])] else: raise ValueError(param, source, kind, line) # 'free' is a special value for bounds/constraints df = ( pd.DataFrame(elem, columns=["year", "value"]) .replace("free", np.nan) .astype({"value": float}) ) # Add parameter name and source df["param"] = param df["source"] = source return df def parse_section(self, name): result = {} params = [] # Parse each line for line in map(str.split, self.get_section(name).split("\n")): if line in ([], ["*"]) or line[0].startswith("#"): # End of section, comment, or blank line continue param = line[0] if param == name: # Start of the section result["extra"] = line[1:] elif param in "bda bdc": result["type"] = line.pop(1) # 'lo' or 'hi' params.append(self.const_or_ts(line)) # elif param in 'mpa mpc': # # TODO implement this # continue elif param in self.ts_params: params.append(self.const_or_ts(line)) elif param in self.scalar_params: assert len(line) == 2, line result[param] = self.scalar_params[param](line[1]) # Concatenate accumulated params to a single DataFrame if len(params): result["params"] = pd.concat(params, sort=False) return result