Source code for respy.clsRespy

import atexit
import copy
import os
import pickle as pkl

import numpy as np
import pandas as pd

from respy.custom_exceptions import UserError
from respy.fortran.interface import resfort_interface
from respy.pre_processing.data_processing import process_dataset
from respy.pre_processing.model_checking import check_model_attributes
from respy.pre_processing.model_checking import check_model_solution
from respy.pre_processing.model_processing import process_model_spec
from respy.pre_processing.model_processing import write_out_model_spec
from respy.python.interface import respy_interface
from respy.python.record.record_estimation import record_estimation_sample
from respy.python.shared.shared_auxiliary import add_solution
from respy.python.shared.shared_auxiliary import dist_class_attributes
from respy.python.shared.shared_auxiliary import distribute_parameters
from respy.python.shared.shared_auxiliary import get_est_info
from respy.python.shared.shared_auxiliary import remove_scratch
from respy.python.shared.shared_auxiliary import replace_missing_values
from respy.python.shared.shared_constants import DATA_FORMATS_SIM
from respy.python.shared.shared_constants import DATA_LABELS_SIM
from respy.python.shared.shared_constants import OPT_EST_FORT
from respy.python.shared.shared_constants import OPT_EST_PYTH
from respy.python.simulate.simulate_auxiliary import check_dataset_sim
from respy.python.simulate.simulate_auxiliary import write_info
from respy.python.simulate.simulate_auxiliary import write_out


[docs]class RespyCls(object): """Class that defines a model in respy. """ def __init__(self, params_spec, options_spec): self._set_hardcoded_attributes() self.attr = process_model_spec(params_spec, options_spec) self._update_derived_attributes() self._initialize_solution_attributes() self.attr["is_locked"] = False self.attr["is_solved"] = False self.lock() def _set_hardcoded_attributes(self): """Set attributes that can't be changed by the model specification.""" self.derived_attributes = ["is_myopic", "num_paras"] self.solution_attributes = [ "periods_rewards_systematic", "states_number_period", "mapping_state_idx", "periods_emax", "states_all", "state_space", ] def _initialize_solution_attributes(self): """Initialize solution attributes to None.""" for attribute in self.solution_attributes: self.attr[attribute] = None def update_optim_paras(self, x_econ): """Update model parameters.""" x_econ = copy.deepcopy(x_econ) self.reset() new_paras_dict = distribute_parameters( paras_vec=x_econ, is_debug=True, paras_type="econ" ) self.attr["optim_paras"].update(new_paras_dict) def lock(self): """Lock class instance.""" assert not self.attr[ "is_locked" ], "Only unlocked instances of clsRespy can be locked." self._update_derived_attributes() self._check_model_attributes() # ==================================================================== # todo: reimplement checks for python solution # ==================================================================== if self.attr["version"] == "fortran": self._check_model_solution() # ==================================================================== self.attr["is_locked"] = True def unlock(self): """Unlock class instance.""" assert self.attr[ "is_locked" ], "Only locked instances of clsRespy can be unlocked." self.attr["is_locked"] = False def get_attr(self, key): """Get attributes.""" assert self.attr["is_locked"] self._check_key(key) if key in self.solution_attributes: assert self.get_attr("is_solved"), "invalid request" return self.attr[key] def set_attr(self, key, value): """Set attributes.""" assert not self.attr["is_locked"] self._check_key(key) invalid_attr = self.derived_attributes + ["optim_paras", "init_dict"] if key in invalid_attr: raise AssertionError("{} must not be modified by users!".format(key)) if key in self.solution_attributes: assert not self.attr[ "is_solved" ], "Solution attributes can only be set if model is not solved." self.attr[key] = value self._update_derived_attributes() def store(self, file_name): """Store class instance.""" assert self.attr["is_locked"] assert isinstance(file_name, str) pkl.dump(self, open(file_name, "wb")) def write_out(self, fname="model.respy"): """Write out the implied initialization file of the class instance.""" write_out_model_spec(self.attr, fname) def reset(self): """Remove solution attributes from class instance.""" for label in self.solution_attributes: self.attr[label] = None self.attr["is_solved"] = False def check_equal_solution(self, other): """Compare two class instances for equality of solution attributes.""" assert isinstance(other, RespyCls) for key_ in self.solution_attributes: try: np.testing.assert_almost_equal(self.attr[key_], other.attr[key_]) except AssertionError: return False return True def _update_derived_attributes(self): """Update derived attributes.""" # note: don't remove the conversion to bool. It seems unnecessary but it # converts a numpy bool to python bool. self.attr["is_myopic"] = bool((self.attr["optim_paras"]["delta"] == 0.00)[0]) def _check_model_attributes(self): """Check integrity of class instance. This testing is done the first time the class is locked and if the package is running in debug mode. """ check_model_attributes(self.attr) def _check_model_solution(self): """Check the integrity of the results.""" check_model_solution(self.attr) def _check_key(self, key): """Check that key is present.""" assert key in self.attr.keys(), "Invalid key requested: {}".format(key) def check_estimation(self): """Check model attributes that are only relevant for estimation tasks.""" # Check that class instance is locked. assert self.get_attr("is_locked") # Check that no other estimations are currently running in this directory. assert not os.path.exists(".estimation.respy.scratch") # Distribute class attributes ( optimizer_options, optimizer_used, optim_paras, version, maxfun, num_paras, file_est, ) = dist_class_attributes( self, "optimizer_options", "optimizer_used", "optim_paras", "version", "maxfun", "num_paras", "file_est", ) # Ensure that at least one parameter is free. if sum(optim_paras["paras_fixed"]) == num_paras: raise UserError("Estimation requires at least one free parameter") # Make sure the estimation dataset exists if not os.path.exists(file_est): raise UserError("Estimation dataset does not exist") if maxfun > 0: assert optimizer_used in optimizer_options.keys() # Make sure the requested optimizer is valid if version == "python": assert optimizer_used in OPT_EST_PYTH elif version == "fortran": assert optimizer_used in OPT_EST_FORT else: raise AssertionError return self def fit(self): """Estimate the model.""" # Cleanup for fname in ["est.respy.log", "est.respy.info"]: if os.path.exists(fname): os.unlink(fname) if self.get_attr("is_solved"): self.reset() self.check_estimation() # This locks the estimation directory for additional estimation requests. atexit.register(remove_scratch, ".estimation.respy.scratch") open(".estimation.respy.scratch", "w").close() # Read in estimation dataset. It only reads in the number of agents # requested for the estimation (or all available, depending on which is # less). It allows to read in only a subset of the initial conditions. data_frame = process_dataset(self) record_estimation_sample(data_frame) # Distribute class attributes version = self.get_attr("version") data_array = data_frame.to_numpy() # Select appropriate interface if version in ["python"]: respy_interface(self, "estimate", data_frame) elif version in ["fortran"]: resfort_interface(self, "estimate", data_array) else: raise NotImplementedError rslt = get_est_info() x, val = rslt["paras_step"], rslt["value_step"] for fname in [".estimation.respy.scratch", ".stop.respy.scratch"]: remove_scratch(fname) return x, val def simulate(self): """Simulate dataset of synthetic agents following the model.""" # Distribute class attributes is_debug, version, is_store, file_sim = dist_class_attributes( self, "is_debug", "version", "is_store", "file_sim" ) # Cleanup for ext in ["sim", "sol", "dat", "info"]: fname = file_sim + ".respy." + ext if os.path.exists(fname): os.unlink(fname) # Select appropriate interface if version in ["python"]: state_space, data_array = respy_interface(self, "simulate") elif version in ["fortran"]: solution, data_array = resfort_interface(self, "simulate") else: raise NotImplementedError # Attach solution to class instance if version == "fortran": self = add_solution(self, *solution) elif version == "python": self.unlock() self.set_attr("state_space", state_space) self.lock() ( states_all, mapping_state_idx, periods_rewards_systematic, periods_emax, ) = state_space._get_fortran_counterparts() self = add_solution( self, periods_rewards_systematic, state_space.states_per_period, mapping_state_idx, periods_emax, states_all, ) else: raise NotImplementedError self.unlock() self.set_attr("is_solved", True) self.lock() # Store object to file if is_store: self.store("solution.respy.pkl") # ==================================================================== # todo: harmonize python and fortran # ==================================================================== if self.attr["version"] == "python": data_frame = data_array[DATA_LABELS_SIM] elif self.attr["version"] == "fortran": data_frame = pd.DataFrame( data=replace_missing_values(data_array), columns=DATA_LABELS_SIM ) else: raise NotImplementedError data_frame = data_frame.astype(DATA_FORMATS_SIM) # ==================================================================== data_frame.set_index(["Identifier", "Period"], drop=False, inplace=True) # Checks if is_debug: check_dataset_sim(data_frame, self) write_out(self, data_frame) write_info(self, data_frame) return self, data_frame