Source code for respy.pre_processing.model_checking
"""Everything related to validate the model."""
import numba as nb
import numpy as np
[docs]def validate_options(o):
"""Validate the options provided by the user."""
assert _is_positive_nonzero_integer(o["n_periods"])
for option, value in o.items():
if "draws" in option:
assert _is_positive_nonzero_integer(value)
elif option.endswith("_seed"):
assert _is_nonnegative_integer(value)
assert 0 < o["estimation_tau"]
assert (
_is_positive_nonzero_integer(o["interpolation_points"])
or o["interpolation_points"] == -1
)
assert _is_positive_nonzero_integer(o["simulation_agents"])
assert isinstance(o["core_state_space_filters"], list) and all( # noqa: PT018
isinstance(filter_, str) for filter_ in o["core_state_space_filters"]
)
assert isinstance(o["negative_choice_set"], dict) and all( # noqa: PT018
isinstance(key, str)
and isinstance(val, list)
and all(isinstance(condition, str) for condition in val)
for key, val in o["negative_choice_set"].items()
)
assert o["monte_carlo_sequence"] in ["random", "halton", "sobol"]
[docs]def validate_params(params, optim_paras):
"""Validate params."""
_validate_shocks(params, optim_paras)
[docs]def _validate_shocks(params, optim_paras):
"""Validate that the elements of the shock matrix are correctly sorted."""
choices = list(optim_paras["choices"])
if "shocks_sdcorr" in params.index:
sds_flat = [f"sd_{c}" for c in choices]
corrs_flat = []
for i, c_1 in enumerate(choices):
for c_2 in choices[: i + 1]:
if c_1 == c_2:
pass
else:
corrs_flat.append(f"corr_{c_1}_{c_2}")
index = sds_flat + corrs_flat
else:
index = []
for i, c_1 in enumerate(choices):
for c_2 in choices[: i + 1]:
if c_1 == c_2:
label = "var" if "shocks_cov" in params.index else "chol"
index.append(f"{label}_{c_1}")
else:
label = "cov" if "shocks_cov" in params.index else "chol"
index.append(f"{label}_{c_1}_{c_2}")
assert all(
params.filter(regex=r"shocks_(sdcorr|cov|chol)", axis=0).index.get_level_values(
"name"
)
== index
), f"Reorder the 'name' index of the shock matrix to {index}."
[docs]def _is_positive_nonzero_integer(x):
return isinstance(x, (int, np.integer)) and x > 0
[docs]def _is_nonnegative_integer(x):
return isinstance(x, (int, np.integer)) and x >= 0
[docs]def check_model_solution(optim_paras, options, state_space):
"""Check properties of the solution of a model."""
# Distribute class attributes
choices = optim_paras["choices"]
max_initial_experience = np.array(
[max(choices[choice]["start"]) for choice in optim_paras["choices_w_exp"]]
)
n_periods = options["n_periods"]
# Check period.
assert np.all(np.isin(state_space.core.period, range(n_periods)))
# The sum of years of experiences cannot be larger than constraint time.
assert np.all(
state_space.core[[f"exp_{c}" for c in optim_paras["choices_w_exp"]]].sum(axis=1)
<= (state_space.core.period + max_initial_experience.sum())
)
# Choice experience cannot exceed the time frame.
for choice in optim_paras["choices_w_exp"]:
assert state_space.core[f"exp_{choice}"].le(choices[choice]["max"]).all()
# Lagged choices are always in ``range(n_choices)``.
if optim_paras["n_lagged_choices"]:
assert np.all(
state_space.core.filter(regex=r"\blagged_choice_[0-9]*\b").isin(
range(len(choices))
)
)
assert np.all(np.isfinite(state_space.core.select_dtypes(exclude=np.bool)))
# Check for duplicate rows in each period. We only have possible duplicates if there
# are multiple initial conditions.
assert not state_space.core.duplicated().any()
# Check that we have as many indices as states.
n_valid_indices = len(state_space.indexer)
assert state_space.core.shape[0] == n_valid_indices
# Check finiteness of rewards and emaxs.
finite_wages = _apply_to_attribute_of_state_space(state_space.wages, np.isfinite)
for arr in finite_wages:
assert np.all(arr)
finite_nonpecs = _apply_to_attribute_of_state_space(
state_space.nonpecs, np.isfinite
)
for arr in finite_nonpecs:
assert np.all(arr)
finite_vf = _apply_to_attribute_of_state_space(
state_space.expected_value_functions, np.isfinite
)
for arr in finite_vf:
assert np.all(arr)
[docs]def _apply_to_attribute_of_state_space(attribute, func):
"""Apply a function to a state space attribute which might be dense or not.
Attribute might be `state_space.wages` which can be a dictionary or a Numpy array.
"""
if isinstance(attribute, (dict, nb.typed.Dict)):
out = [func(val) for val in attribute.values()]
else:
out = func(attribute)
return out