Source code for respy.state_space

"""Everything related to the state space of a structural model."""
import itertools

import numba as nb
import numpy as np
import pandas as pd
from numba.typed import Dict

from respy._numba import sum_over_numba_boolean_unituple
from respy.exogenous_processes import create_transit_choice_set
from respy.exogenous_processes import create_transition_objects
from respy.exogenous_processes import weight_continuation_values
from respy.parallelization import parallelize_across_dense_dimensions
from respy.shared import apply_law_of_motion_for_core
from respy.shared import compute_covariates
from respy.shared import convert_dictionary_keys_to_dense_indices
from respy.shared import create_base_draws
from respy.shared import create_core_state_space_columns
from respy.shared import create_dense_state_space_columns
from respy.shared import downcast_to_smallest_dtype
from respy.shared import dump_objects
from respy.shared import load_objects
from respy.shared import map_states_to_core_key_and_core_index
from respy.shared import prepare_cache_directory
from respy.shared import return_core_dense_key


[docs]def create_state_space_class(optim_paras, options): """Create the state space of the model.""" prepare_cache_directory(options) core = _create_core_state_space(optim_paras, options) dense_grid = _create_dense_state_space_grid(optim_paras) # Downcast after calculations or be aware of silent integer overflows. core = compute_covariates(core, options["covariates_core"]) core = core.apply(downcast_to_smallest_dtype) dense = _create_dense_state_space_covariates(dense_grid, optim_paras, options) core_period_choice = _create_core_period_choice(core, optim_paras, options) core_key_to_complex = dict(enumerate(core_period_choice)) core_key_to_core_indices = { i: core_period_choice[complex_] for i, complex_ in core_key_to_complex.items() } indexer = _create_indexer(core, core_key_to_core_indices, optim_paras) dense_period_choice = _create_dense_period_choice( core, dense, core_key_to_core_indices, core_key_to_complex, optim_paras, options ) state_space = StateSpace( core, indexer, dense, dense_period_choice, core_key_to_complex, core_key_to_core_indices, optim_paras, options, ) return state_space
[docs]class StateSpace: """The state space of a structural model. Attributes ---------- core : dict of pandas.DataFrame The core state space is a :class:`pandas.DataFrame` that contains all states of core dimensions. A core dimension is a dimension whose value is uniquely determined by past choices and time. Core dimensions include choices, experiences, lagged choices and periods. dense_key_to_core_indices : Dict[int, Array[int]] A mapping from dense keys to ``.loc`` locations in the ``core``. """ def __init__( self, core, indexer, dense, dense_period_cores, core_key_to_complex, core_key_to_core_indices, optim_paras, options, ): """Initialize the state space. See :ref:`state space location indices <state_space_location_indices>` for an explanation of the dict objects. Parameters ---------- core : pandas.DataFrame DataFrame containing one core state per row. indexer : numba.typed.Dict Maps states (rows of core) into tuples containing core key and core index. i : state -> (core_key, core_index) dense : dict Maps dense states into dense covariates. dense_period_cores : dict Maps period, choice_set and dense_index into core_key. d : (core_key,choice_set,dense_index) -> core_key core_key_to_complex : dict Maps period and choice_set into core_key core_key_to_core_indices : dict Maps core_keys into core_indices. """ self.core = core self.indexer = indexer self.dense_period_cores = dense_period_cores self.dense = dense self.core_key_to_complex = core_key_to_complex self.core_key_to_core_indices = core_key_to_core_indices self.optim_paras = optim_paras self.options = options self.n_periods = options["n_periods"] self._create_conversion_dictionaries() self.base_draws_sol = self.create_draws(options) self.create_arrays_for_expected_value_functions() if len(self.optim_paras["exogenous_processes"]) > 0: self.create_objects_for_exogenous_processes() self.child_indices = self.collect_child_indices()
[docs] def _create_conversion_dictionaries(self): """Create mappings between state space location indices and properties. See :ref:`state space location indices <state_space_location_indices>`. """ self.dense_key_to_complex = { i: k for i, k in enumerate(self.dense_period_cores) } self.dense_key_to_core_key = { i: self.dense_period_cores[self.dense_key_to_complex[i]] for i in self.dense_key_to_complex } self.dense_key_to_choice_set = { i: self.dense_key_to_complex[i][1] for i in self.dense_key_to_complex } self.dense_key_to_core_indices = { i: np.array(self.core_key_to_core_indices[self.dense_key_to_core_key[i]]) for i in self.dense_key_to_complex } self.core_key_and_dense_index_to_dense_key = Dict.empty( key_type=nb.types.UniTuple(nb.types.int64, 2), value_type=nb.types.int64, ) for i in self.dense_key_to_complex: self.core_key_and_dense_index_to_dense_key[ return_core_dense_key( self.dense_key_to_core_key[i], *self.dense_key_to_complex[i][2:], ) ] = i if self.dense is False: self.dense_covariates_to_dense_index = {} self.dense_key_to_dense_covariates = { i: {} for i in self.dense_key_to_complex } else: n_dense = len(create_dense_state_space_columns(self.optim_paras)) self.dense_covariates_to_dense_index = Dict.empty( key_type=nb.types.UniTuple(nb.types.int64, n_dense), value_type=nb.types.int64, ) for i, k in enumerate(self.dense): self.dense_covariates_to_dense_index[k] = i self.dense_key_to_dense_covariates = { i: list(self.dense.keys())[self.dense_key_to_complex[i][2]] for i in self.dense_key_to_complex }
[docs] def create_arrays_for_expected_value_functions(self): """Create a container for expected value functions.""" self.expected_value_functions = Dict.empty( key_type=nb.types.int64, value_type=nb.types.float64[:] ) for index, indices in self.dense_key_to_core_indices.items(): self.expected_value_functions[index] = np.zeros(len(indices))
[docs] def create_objects_for_exogenous_processes(self): """Create mappings for the implementation of the exogenous processes.""" # Include switch arg exogenous_processes = self.optim_paras["exogenous_processes"] n_exog = len(exogenous_processes) # How does the accounting work here again? Would that actually work? levels_of_processes = [range(len(i)) for i in exogenous_processes.values()] self.exogenous_grid = list(itertools.product(*levels_of_processes)) self.dense_key_to_transit_keys = create_transition_objects( self.dense_key_to_dense_covariates, self.dense_key_to_core_key, self.exogenous_grid, n_exog, bypass={ "dense_covariates_to_dense_index": self.dense_covariates_to_dense_index, "core_key_and_dense_index_to_dense_key": self.core_key_and_dense_index_to_dense_key, # noqa: E501 }, ) self.transit_key_to_choice_set = create_transit_choice_set( self.dense_key_to_transit_keys, self.dense_key_to_choice_set )
[docs] def get_continuation_values(self, period): """Get continuation values. The function takes the expected value functions from the previous periods and then uses the indices of child states to put these expected value functions in the correct format. If period is equal to self.n_periods - 1 the function returns arrays of zeros since we are in terminal states. Otherwise we retrieve expected value functions for next period and call :func:`_get_continuation_values` to assign continuation values to all choices within a period. (The object `subset_expected_value_functions` is required because we need a Numba typed dict but the function :meth:`StateSpace.get_attribute_from_period` just returns a normal dict) Returns ------- continuation_values : numba.typed.Dict The continuation values for each dense key in a :class:`numpy.ndarray`. See also -------- _get_continuation_values A more theoretical explanation can be found here: See :ref:`get continuation values <get_continuation_values>`. """ if period == self.n_periods - 1: shapes = self.get_attribute_from_period("base_draws_sol", period) states = self.get_attribute_from_period("dense_key_to_core_indices", period) continuation_values = { key: np.zeros((states[key].shape[0], shapes[key].shape[1])) for key in shapes } else: child_indices = self.get_attribute_from_period("child_indices", period) expected_value_functions = self.get_attribute_from_period( "expected_value_functions", period + 1 ) subset_expected_value_functions = Dict.empty( key_type=nb.types.int64, value_type=nb.types.float64[:] ) for key, value in expected_value_functions.items(): subset_expected_value_functions[key] = value transit_choice_sets = ( "transit_key_to_choice_set" if hasattr(self, "transit_key_to_choice_set") else "dense_key_to_choice_set" ) continuation_values = _get_continuation_values( self.get_attribute_from_period("dense_key_to_complex", period), self.get_attribute_from_period(transit_choice_sets, period), self.get_attribute_from_period("dense_key_to_core_indices", period), child_indices, self.core_key_and_dense_index_to_dense_key, bypass={"expected_value_functions": subset_expected_value_functions}, ) if len(self.optim_paras["exogenous_processes"]) > 0: continuation_values = weight_continuation_values( self.get_attribute_from_period("dense_key_to_complex", period), self.options, bypass={ "continuation_values": continuation_values, "transit_key_to_choice_set": self.get_attribute_from_period( transit_choice_sets, period ), }, ) return continuation_values
[docs] def collect_child_indices(self): """Collect for each state the indices of its child states. To collect continuation values, one needs to find the child state. This function searches for all states except for the last period their possible successors by taking every possible combination defined by the law of motion. See also -------- _collect_child_indices A more theoretical explanation can be found here: See :ref:`collect child indices <collect_child_indices>`. """ if self.n_periods == 1: child_indices = None else: dense_key_to_complex_except_last_period = { k: v for k, v in self.dense_key_to_complex.items() if v[0] < self.n_periods - 1 } transit_choice_sets = ( "transit_key_to_choice_set" if hasattr(self, "transit_key_to_choice_set") else "dense_key_to_choice_set" ) dense_key_to_choice_set_except_last_period = { k: getattr(self, transit_choice_sets)[k] for k in dense_key_to_complex_except_last_period } child_indices = _collect_child_indices( dense_key_to_complex_except_last_period, dense_key_to_choice_set_except_last_period, self.indexer, self.optim_paras, self.options, ) return child_indices
[docs] def create_draws(self, options): """Get draws.""" n_choices_in_sets = list(set(map(sum, self.dense_key_to_choice_set.values()))) shocks_sets = [] for n_choices in n_choices_in_sets: draws = create_base_draws( (options["n_periods"], options["solution_draws"], n_choices), next(options["solution_seed_startup"]), options["monte_carlo_sequence"], ) shocks_sets.append(draws) draws = {} for dense_idx, complex_ix in self.dense_key_to_complex.items(): period = complex_ix[0] n_choices = sum(complex_ix[1]) idx = n_choices_in_sets.index(n_choices) draws[dense_idx] = shocks_sets[idx][period] return draws
[docs] def get_dense_keys_from_period(self, period): """Get dense indices from one period.""" return [ dense_index for dense_index, complex_ in self.dense_key_to_complex.items() if complex_[0] == period ]
[docs] def get_attribute_from_period(self, attribute, period): """Get an attribute of the state space sliced to a given period. Parameters ---------- attribute : str Attribute name, e.g. ``"states"`` to retrieve ``self.states``. period : int Attribute is retrieved from this period. """ dense_indices_in_period = self.get_dense_keys_from_period(period) return { dense_index: attr for dense_index, attr in getattr(self, attribute).items() if dense_index in dense_indices_in_period }
[docs] def set_attribute_from_keys(self, attribute, value): """Set attributes by keys. This function allows to modify the period part of a certain state space object. It allows to set values for all dense period choice cores within one period. During the model solution this method in period :math:`t + 1` communicates with get continuation values in period :math:`t`. Note that the values are changed in-place. Parameters ---------- attribute : str The name of the state space attribute which is changed in-place. value : numpy.ndarray The value to which the Numpy array is set. """ for key in value: getattr(self, attribute)[key][:] = value[key]
[docs]def _create_core_state_space(optim_paras, options): """Create the core state space. The state space of the model are all feasible combinations of the period, experiences, lagged choices and types. Creating the state space involves two steps. First, the core state space is created which abstracts from levels of initial experiences and instead uses the minimum initial experience per choice. Secondly, the state space is adjusted by all combinations of initial experiences and also filtered, excluding invalid states. Notes ----- Here are some details on the implementation. - In the process of creating this function, we came up with several different ideas. Basically, there two fringe cases to find all valid states in the state space. First, all combinations of state attributes are created. Then, only valid states are selected. The problem with this approach is that the state space is extremely sparse. The number of combinations created by using ``itertools.product`` or ``np.meshgrid`` is much higher than the number of valid states. Because of that, we ran into memory or runtime problems which seemed unsolvable. The second approach is more similar to the actual process were states are created by incrementing experiences from period to period. In an extreme case, a function mimics an agent in one period and recursively creates updates of itself in future periods. Using this approach, we ran into the Python recursion limit and runtime problems, but it might be feasible. These two approaches build the frame for thinking about a solution to this problem where filtering is, first, applied after creating a massive amount of candidate states, or, secondly, before creating states. A practical solution must take into account that some restrictions to the state space are more important than others and should be applied earlier. Others can be delayed. As a compromise, we built on the former approach in :func:`~respy.tests._former_code._create_state_space_kw94` which loops over choices and possible experience values. Thus, it incorporates some fundamental restrictions like time limits and needs less filtering. - The former implementation, :func:`~respy.tests._former_code._create_state_space_kw94`, had four hard-coded choices and a loop for every choice with experience accumulation. Thus, this function is useless if the model requires additional or less choices. For each number of choices with and without experience, a new function had to be programmed. The following approach uses the same loops over choices with experiences, but they are dynamically created by the recursive function :func:`_create_core_state_space_per_period`. - There are characteristics of the state space which are independent from all other state space attributes like types (and almost lagged choices). These attributes only duplicate the existing state space and can be taken into account in a later stage of the process. See also -------- _create_core_from_choice_experiences _create_core_state_space_per_period _filter_core_state_space _add_initial_experiences_to_core_state_space _create_indexer """ core = _create_core_from_choice_experiences(optim_paras) core = _add_lagged_choice_to_core_state_space(core, optim_paras) core = _filter_core_state_space(core, options) core = _add_initial_experiences_to_core_state_space(core, optim_paras) core = core.sort_values("period").reset_index(drop=True) return core
[docs]def _create_core_from_choice_experiences(optim_paras): """Create the core state space from choice experiences. The core state space abstracts from initial experiences and uses the maximum range between initial experiences and maximum experiences to cover the whole range. The combinations of initial experiences are applied later in :func:`_add_initial_experiences_to_core_state_space`. See also -------- _create_core_state_space_per_period """ choices_w_exp = list(optim_paras["choices_w_exp"]) minimal_initial_experience = np.array( [min(optim_paras["choices"][choice]["start"]) for choice in choices_w_exp], dtype=np.uint8, ) maximum_exp = np.array( [optim_paras["choices"][choice]["max"] for choice in choices_w_exp], dtype=np.uint8, ) additional_exp = maximum_exp - minimal_initial_experience exp_cols = [f"exp_{choice}" for choice in choices_w_exp] container = [] for period in np.arange(optim_paras["n_periods"], dtype=np.uint8): data = _create_core_state_space_per_period( period, additional_exp, optim_paras, np.zeros(len(choices_w_exp), dtype=np.uint8), ) df_ = pd.DataFrame.from_records(data, columns=exp_cols) df_.insert(0, "period", period) container.append(df_) df = pd.concat(container, axis="rows", sort=False) return df
[docs]def _create_core_state_space_per_period( period, additional_exp, optim_paras, experiences, pos=0 ): """Create core state space per period. First, this function returns a state combined with all possible lagged choices and types. Secondly, if there exists a choice with experience in ``additional_exp[pos]``, loop over all admissible experiences, update the state and pass it to the same function, but moving to the next choice which accumulates experience. Parameters ---------- period : int Number of period. additional_exp : numpy.ndarray Array with shape (n_choices_w_exp,) containing integers representing the additional experience per choice which is admissible. This is the difference between the maximum experience and minimum of initial experience per choice. experiences : None or numpy.ndarray, default None Array with shape (n_choices_w_exp,) which contains current experience of state. pos : int, default 0 Index for current choice with experience. If index is valid for array ``experiences``, then loop over all admissible experience levels of this choice. Otherwise, ``experiences[pos]`` would lead to an :exc:`IndexError`. """ # Return experiences combined with lagged choices and types. yield experiences # Check if there is an additional choice left to start another loop. if pos < experiences.shape[0]: # Upper bound of additional experience is given by the remaining time or the # maximum experience which can be accumulated in experience[pos]. remaining_time = period - experiences.sum() max_experience = additional_exp[pos] # +1 is necessary so that the remaining time or max_experience is exhausted. for i in np.arange(min(remaining_time, max_experience) + 1, dtype=np.uint8): # Update experiences and call the same function with the next choice. updated_experiences = experiences.copy() updated_experiences[pos] += i yield from _create_core_state_space_per_period( period, additional_exp, optim_paras, updated_experiences, pos + 1 )
[docs]def _add_lagged_choice_to_core_state_space(df, optim_paras): container = [] for lag in range(1, optim_paras["n_lagged_choices"] + 1): for choice_code in range(len(optim_paras["choices"])): df_ = df.copy() df_[f"lagged_choice_{lag}"] = choice_code container.append(df_) df = pd.concat(container, axis="rows", sort=False) if container else df return df
[docs]def _filter_core_state_space(df, options): """Apply filters to the core state space. Sometimes, we want to apply filters to a group of choices. Thus, use the following shortcuts. - ``i`` is replaced with every choice with experience. - ``j`` is replaced with every choice without experience. - ``k`` is replaced with every choice with a wage. Parameters ---------- df : pandas.DataFrame options : dict """ for definition in options["core_state_space_filters"]: df = df.loc[~df.eval(definition)] return df
[docs]def _add_initial_experiences_to_core_state_space(df, optim_paras): """Add initial experiences to core state space. As the core state space abstracts from differences in initial experiences, this function loops through all combinations from initial experiences and adds them to existing experiences. After that, we need to check whether the maximum in experiences is still binding. """ choices = optim_paras["choices"] # Create combinations of starting values initial_experiences_combinations = itertools.product( *(choices[choice]["start"] for choice in optim_paras["choices_w_exp"]) ) maximum_exp = np.array( [choices[choice]["max"] for choice in optim_paras["choices_w_exp"]] ) exp_cols = df.filter(like="exp_").columns.tolist() container = [] for initial_exp in initial_experiences_combinations: df_ = df.copy() # Add initial experiences. df_[exp_cols] += initial_exp # Check that max_experience is still fulfilled. df_ = df_.loc[df_[exp_cols].le(maximum_exp).all(axis="columns")].copy() container.append(df_) df = pd.concat(container, axis="rows", sort=False).drop_duplicates() return df
[docs]def _create_dense_state_space_grid(optim_paras): """Create a grid of dense variables. The function loops through all potential realizations of each dense dimension and returns a list of all possible joint realizations of dense variables. Parameters ---------- optim_paras : dict Contains parsed model parameters. Returns ------- dense_state_space_grid : list Contains all dense states as tuples. """ levels_of_observables = [range(len(i)) for i in optim_paras["observables"].values()] types = [range(optim_paras["n_types"])] if optim_paras["n_types"] >= 2 else [] dense_state_space_grid = list(itertools.product(*levels_of_observables, *types)) if dense_state_space_grid == [()]: dense_state_space_grid = False return dense_state_space_grid
[docs]def _create_dense_state_space_covariates(dense_grid, optim_paras, options): """Obtain covariates for all dense states.""" if dense_grid: columns = create_dense_state_space_columns(optim_paras) df = pd.DataFrame(data=dense_grid, columns=columns).set_index( columns, drop=False ) covariates = compute_covariates(df, options["covariates_dense"]) covariates = covariates.apply(downcast_to_smallest_dtype) covariates = covariates.to_dict(orient="index") covariates = convert_dictionary_keys_to_dense_indices(covariates) else: covariates = False return covariates
[docs]def create_is_inadmissible(df, optim_paras, options): """Compute is_inadmissible for passed states.""" df = df.copy() for choice in optim_paras["choices"]: df[f"_{choice}"] = False for formula in options["negative_choice_set"][choice]: try: df[f"_{choice}"] |= df.eval(formula) except pd.errors.UndefinedVariableError: pass return df
[docs]def _create_indexer(core, core_key_to_core_indices, optim_paras): """Create indexer of core state space. Returns ------- indexer : numba.typed.Dict Maps a row of the core state space into its position within the period_choice_cores. c: core_state -> (core_key,core_index) """ core_columns = ["period"] + create_core_state_space_columns(optim_paras) n_core_state_variables = len(core_columns) indexer = Dict.empty( key_type=nb.types.UniTuple(nb.types.int64, n_core_state_variables), value_type=nb.types.UniTuple(nb.types.int64, 2), ) for core_idx, indices in core_key_to_core_indices.items(): states = core.loc[indices, core_columns].to_numpy() for i, state in enumerate(states): indexer[tuple(state)] = (core_idx, i) return indexer
[docs]def _create_core_period_choice(core, optim_paras, options): """Create the core separated into period-choice cores. Returns ------- core_period_choice : dict c: (period, choice_set) -> core_indices """ choices = [f"_{choice}" for choice in optim_paras["choices"]] df = core.copy() df = create_is_inadmissible(df, optim_paras, options) df[choices] = ~df[choices] core_period_choice = { (idx[0], idx[1:]): indices for idx, indices in df.groupby(["period"] + choices).groups.items() } return core_period_choice
[docs]def _create_dense_period_choice( core, dense, core_key_to_core_indices, core_key_to_complex, optim_paras, options ): """Create dense period choice parts of the state space. We loop over all dense combinations and calculate choice restrictions for each particular dense state space. The information allows us to compile a dict that maps a combination of period, choice_set and dense_index into core_key! Note that we do not allow for choice restrictions that interact between core and dense covariates. In order to do so we would have to rewrite this function and return explicit state space position instead of core indices! Returns ------- dense_period_choice : dict d: (period, choice_set, dense_index) -> core_key """ if not dense: for key, complex_ in core_key_to_complex.items(): dump_objects( core.loc[core_key_to_core_indices[key]], "states", complex_, options ) dense_period_choice = {k: i for i, k in core_key_to_complex.items()} else: choices = [f"_{choice}" for choice in optim_paras["choices"]] dense_period_choice = {} for dense_idx, (_, dense_vec) in enumerate(dense.items()): states = core.copy().assign(**dense_vec) states = compute_covariates(states, options["covariates_all"]) states = create_is_inadmissible(states, optim_paras, options) for core_idx, indices in core_key_to_core_indices.items(): df = states.copy().loc[indices].assign(**dense_vec) df[choices] = ~df[choices] grouper = df.groupby(choices).groups if not len(grouper) == 1: raise ValueError( "Choice restrictions cannot interact between core and dense " "information such that heterogeneous choice sets within a " "period are created. Use penalties in the utility functions " "for that." ) period_choice = { (core_key_to_complex[core_idx][0], idx, dense_idx): core_idx for idx, indices in grouper.items() } dense_period_choice = {**dense_period_choice, **period_choice} idx = list(grouper.keys())[0] dump_objects( df, "states", (core_key_to_complex[core_idx][0], idx, dense_idx), options, ) return dense_period_choice
@parallelize_across_dense_dimensions
[docs]def _get_continuation_values( dense_complex_index, choice_set, core_indices, child_indices, core_index_and_dense_vector_to_dense_index, expected_value_functions, ): """Get continuation values from child states. The continuation values are the discounted expected value functions from child states. This method allows to retrieve continuation values that were obtained in the model solution. In particular the function assigns continuation values to state choice combinations by using the child indices created in :func:`_collect_child_indices`. Returns ------- continuation_values : numpy.ndarray Array with shape ``(n_states, n_choices)``. Maps core_key and choice into continuation value. """ if len(dense_complex_index) == 3: period, _, dense_idx = dense_complex_index elif len(dense_complex_index) == 2: period, _ = dense_complex_index dense_idx = 0 n_choices = sum_over_numba_boolean_unituple(choice_set) n_states = core_indices.shape[0] continuation_values = np.zeros((len(core_indices), n_choices)) for i in range(n_states): for j in range(n_choices): core_idx, row_idx = child_indices[i, j] idx = (core_idx, dense_idx) dense_choice = core_index_and_dense_vector_to_dense_index[idx] continuation_values[i, j] = expected_value_functions[dense_choice][row_idx] return continuation_values
@parallelize_across_dense_dimensions
[docs]def _collect_child_indices(complex_, choice_set, indexer, optim_paras, options): """Collect child indices for states. The function takes the states of one dense key, applies the law of motion for each available choice and maps the resulting states to core keys and core indices. Parameters ---------- complex_ : tuple See :ref:`complex`. choice_set : tuple Tuple representing admissible choices indexer : numba.typed.Dict A dictionary with core states as keys and the core key and core index as values. optim_paras : dict Contains model parameters. options : dict Contains model options. Returns ------- indices : numpy.ndarray Array with shape ``(n_states, n_choices * 2)``. Represents the mapping (core_index, choice) -> (dense_key, core_index). """ core_columns = create_core_state_space_columns(optim_paras) states = load_objects("states", complex_, options) n_choices = sum(choice_set) indices = np.full((states.shape[0], n_choices, 2), -1, dtype=np.int64) indices_valid_choices = [i for i, is_valid in enumerate(choice_set) if is_valid] for i, choice in enumerate(indices_valid_choices): states_ = states.copy(deep=True) states_["choice"] = choice states_ = apply_law_of_motion_for_core(states_, optim_paras) states_ = states_[["period"] + core_columns] indices[:, i, 0], indices[:, i, 1] = map_states_to_core_key_and_core_index( states_.to_numpy(), indexer ) return indices