Source code for respy.likelihood

"""Everything related to the estimation with maximum likelihood."""
import warnings
from functools import partial

import numba as nb
import numpy as np
import pandas as pd
from scipy import special

from respy.conditional_draws import create_draws_and_log_prob_wages
from respy.config import MAX_FLOAT
from respy.config import MIN_FLOAT
from respy.parallelization import parallelize_across_dense_dimensions
from respy.parallelization import split_and_combine_df
from respy.pre_processing.data_checking import check_estimation_data
from respy.pre_processing.model_processing import process_params_and_options
from respy.pre_processing.process_covariates import identify_necessary_covariates
from respy.shared import aggregate_keane_wolpin_utility
from respy.shared import compute_covariates
from respy.shared import convert_labeled_variables_to_codes
from respy.shared import create_base_draws
from respy.shared import downcast_to_smallest_dtype
from respy.shared import generate_column_dtype_dict_for_estimation
from respy.shared import map_observations_to_states
from respy.shared import pandas_dot
from respy.shared import rename_labels_to_internal
from respy.shared import select_valid_choices
from respy.shared import subset_cholesky_factor_to_choice_set
from respy.solve import get_solve_func


[docs]def get_log_like_func(params, options, df, return_scalar=True): """Get the criterion function for maximum likelihood estimation. Return a version of the likelihood functions in respy where all arguments except the parameter vector are fixed with :func:`functools.partial`. Thus the function can be directly passed into an optimizer or a function for taking numerical derivatives. Parameters ---------- params : pandas.DataFrame DataFrame containing model parameters. options : dict Dictionary containing model options. df : pandas.DataFrame The model is fit to this dataset. return_scalar : bool, default False Indicator for whether the mean log likelihood should be returned. If False will return a dictionary with the following key and value pairs: - "value": mean log likelihood (float) - "contributions": log likelihood contributions (numpy.array) - "comparison_plot_data" : DataFrame with various contributions for the visualization with estimagic. Data contains the following columns: - ``identifier`` : Individual identifiers derived from input df. - ``period`` : Periods derived from input df. - ``choice`` : Choice that ``value`` is connected to. - ``value`` : Value of log likelihood contribution. - ``kind`` : Kind of contribution (e.g choice or wage). - ``type`` and `log_type_probability``: Will be included in models with types. Returns ------- criterion_function : :func:`log_like` Criterion function where all arguments except the parameter vector are set. Raises ------ AssertionError If data has not the expected format. Examples -------- >>> import respy as rp >>> params, options, data = rp.get_example_model("robinson_crusoe_basic") At default the function returns the log likelihood as a scalar value. >>> log_like = rp.get_log_like_func(params=params, options=options, df=data) >>> scalar = log_like(params) Alternatively, a dictionary containing the log likelihood, as well as log likelihood contributions and a :class:`pandas.DataFrame` can be returned. >>> log_like = rp.get_log_like_func(params=params, options=options, df=data, ... return_scalar=False ... ) >>> outputs = log_like(params) >>> outputs.keys() dict_keys(['value', 'contributions', 'comparison_plot_data']) """ optim_paras, options = process_params_and_options(params, options) optim_paras = _update_optim_paras_with_initial_experience_levels(optim_paras, df) check_estimation_data(df, optim_paras) solve = get_solve_func(params, options) state_space = solve.keywords["state_space"] df, type_covariates = _process_estimation_data( df, state_space, optim_paras, options ) # Replace with decorator. base_draws_est = {} for dense_key, indices in df.groupby("dense_key").groups.items(): n_choices = sum(state_space.dense_key_to_choice_set[dense_key]) draws = create_base_draws( (len(indices), options["estimation_draws"], n_choices), next(options["estimation_seed_startup"]), options["monte_carlo_sequence"], ) base_draws_est[dense_key] = draws criterion_function = partial( log_like, df=df, base_draws_est=base_draws_est, solve=solve, type_covariates=type_covariates, options=options, return_scalar=return_scalar, ) return criterion_function
[docs]def log_like( params, df, base_draws_est, solve, type_covariates, options, return_scalar, ): """Criterion function for the likelihood maximization. This function calculates the likelihood contributions of the sample. Parameters ---------- params : pandas.Series Parameter Series df : pandas.DataFrame The DataFrame contains choices, log wages, the indices of the states for the different types. base_draws_est : numpy.ndarray Set of draws to calculate the probability of observed wages. solve : :func:`~respy.solve.solve` Function which solves the model with new parameters. options : dict Contains model options. """ optim_paras, options = process_params_and_options(params, options) state_space = solve(params) contribs, df, log_type_probabilities = _internal_log_like_obs( state_space, df, base_draws_est, type_covariates, optim_paras, options ) # Return mean log likelihood or log likelihood contributions. out = contribs.mean() if not return_scalar: out = { "value": out, "contributions": contribs, "comparison_plot_data": _create_comparison_plot_data( df, log_type_probabilities, optim_paras ), } return out
[docs]def _internal_log_like_obs( state_space, df, base_draws_est, type_covariates, optim_paras, options ): """Calculate the likelihood contribution of each individual in the sample. The function calculates all likelihood contributions for all observations in the data which means all individual-period-type combinations. Then, likelihoods are accumulated within each individual and type over all periods. After that, the result is multiplied with the type-specific shares which yields the contribution to the likelihood for each individual. Parameters ---------- state_space : :class:`~respy.state_space.StateSpace` Class of state space. df : pandas.DataFrame The DataFrame contains choices, log wages, the indices of the states for the different types. base_draws_est : numpy.ndarray Array with shape (n_periods, n_draws, n_choices) containing i.i.d. draws from standard normal distributions. type_covariates : pandas.DataFrame or None If the model includes types, this is a :class:`pandas.DataFrame` containing the covariates to compute the type probabilities. optim_paras : dict Dictionary with quantities that were extracted from the parameter vector. options : dict Options of the model. Returns ------- contribs : numpy.ndarray Array with shape (n_individuals,) containing contributions of individuals in the empirical data. df : pandas.DataFrame Contains log wages, choices and """ df = df.copy() n_types = optim_paras["n_types"] wages = state_space.wages nonpecs = state_space.nonpecs continuation_values = {} for period in range(options["n_periods"]): continuation_values = { **continuation_values, **state_space.get_continuation_values(period), } df = _compute_wage_and_choice_log_likelihood_contributions( df, base_draws_est, wages, nonpecs, continuation_values, state_space.dense_key_to_choice_set, optim_paras, options, ) # Aggregate choice probabilities and wage densities to log likes per observation. loglikes = ( df.groupby(["identifier", "period", "type"])[["loglike_choice", "loglike_wage"]] .first() .unstack("type") if optim_paras["n_types"] >= 2 else df[["loglike_choice", "loglike_wage"]] ) per_observation_loglikes = loglikes["loglike_choice"] + loglikes["loglike_wage"] per_individual_loglikes = per_observation_loglikes.groupby("identifier").sum() if n_types >= 2: # To not alter the attribute in the functools.partial, create a copy. type_covariates = type_covariates.copy() # Weight each type-specific individual log likelihood with the type probability. log_type_probabilities = _compute_log_type_probabilities( type_covariates, optim_paras, options ) weighted_loglikes = per_individual_loglikes + log_type_probabilities contribs = special.logsumexp(weighted_loglikes, axis=1) else: contribs = per_individual_loglikes.to_numpy().flatten() log_type_probabilities = None contribs = np.clip(contribs, MIN_FLOAT, MAX_FLOAT) return contribs, df, log_type_probabilities
@split_and_combine_df @parallelize_across_dense_dimensions
[docs]def _compute_wage_and_choice_log_likelihood_contributions( df, base_draws_est, wages, nonpecs, continuation_values, choice_set, optim_paras, options, ): """Compute wage and choice log likelihood contributions.""" n_wages = len(select_valid_choices(optim_paras["choices_w_wage"], choice_set)) indices = df["core_index"].to_numpy() selected_wages = wages[indices] log_wages_observed = df["log_wage"].to_numpy() choices = _map_choice_codes_to_indices_of_valid_choice_set( df["choice"].to_numpy(), choice_set ) shocks_cholesky = subset_cholesky_factor_to_choice_set( optim_paras["shocks_cholesky"], choice_set ) draws, wage_loglikes = create_draws_and_log_prob_wages( log_wages_observed, selected_wages, base_draws_est, choices, shocks_cholesky, n_wages, optim_paras["meas_error"], optim_paras["has_meas_error"], ) n_choices = wages.shape[1] n_observations = df.shape[0] draws = draws.reshape(n_observations, -1, n_choices) selected_continuation_values = continuation_values[indices] choice_loglikes = _simulate_log_probability_of_individuals_observed_choice( selected_wages, nonpecs[indices], selected_continuation_values, draws, optim_paras["beta_delta"], choices, options["estimation_tau"], ) df["loglike_choice"] = np.clip(choice_loglikes, MIN_FLOAT, MAX_FLOAT) df["loglike_wage"] = np.clip(wage_loglikes, MIN_FLOAT, MAX_FLOAT) return df
[docs]def _compute_log_type_probabilities(df, optim_paras, options): """Compute the log type probabilities.""" x_betas = _compute_x_beta_for_type_probabilities( df, optim_paras=optim_paras, options=options ) log_probabilities = x_betas - special.logsumexp(x_betas, axis=1, keepdims=True) log_probabilities = np.clip(log_probabilities, MIN_FLOAT, MAX_FLOAT) return log_probabilities
@split_and_combine_df @parallelize_across_dense_dimensions
[docs]def _compute_x_beta_for_type_probabilities(df, optim_paras, options): """Compute the vector dot product of type covariates and type coefficients. For each individual, compute as many vector dot products as there are types. The scalars are later passed to a softmax function to compute the type probabilities. The probability for each individual to be some type. """ for type_ in range(optim_paras["n_types"]): first_observations = df.copy().assign(type=type_) relevant_covariates = identify_necessary_covariates( optim_paras["type_prob"][type_].index, options["covariates_all"] ) first_observations = compute_covariates(first_observations, relevant_covariates) df[type_] = pandas_dot(first_observations, optim_paras["type_prob"][type_]) return df[range(optim_paras["n_types"])]
@nb.njit
[docs]def _logsumexp(x): """Compute logsumexp of `x`. The function does the same as the following code, but faster. .. code-block:: python log_sum_exp = np.max(x) + np.log(np.sum(np.exp(x - np.max(x)))) The subtraction of the maximum prevents overflows and mitigates the impact of underflows. """ # Search maximum. max_x = None length = len(x) for i in range(length): if max_x is None or x[i] > max_x: max_x = x[i] # Calculate sum of exponential differences. sum_exp = 0 for i in range(length): diff = x[i] - max_x sum_exp += np.exp(diff) log_sum_exp = max_x + np.log(sum_exp) return log_sum_exp
@nb.guvectorize( ["f8[:], f8[:], f8[:], f8[:, :], f8, i8, f8, f8[:]"], "(n_choices), (n_choices), (n_choices), (n_draws, n_choices), (), (), () -> ()", nopython=True, target="parallel", )
[docs]def _simulate_log_probability_of_individuals_observed_choice( wages, nonpec, continuation_values, draws, delta, choice, tau, smoothed_log_probability, ): r"""Simulate the probability of observing the agent's choice. The probability is simulated by iterating over a distribution of unobservables. First, the utility of each choice is computed. Then, the probability of observing the choice of the agent given the maximum utility from all choices is computed. The naive implementation calculates the log probability for choice `i` with the softmax function. .. math:: \log(\text{softmax}(x)_i) = \log\left( \frac{e^{x_i}}{\sum^J e^{x_j}} \right) The following function is numerically more robust. The derivation with the two consecutive `logsumexp` functions is included in `#278 <https://github.com/OpenSourceEconomics/respy/pull/288>`_. Parameters ---------- wages : numpy.ndarray Array with shape (n_choices,). nonpec : numpy.ndarray Array with shape (n_choices,). continuation_values : numpy.ndarray Array with shape (n_choices,) draws : numpy.ndarray Array with shape (n_draws, n_choices) delta : float Discount rate. choice : int Choice of the agent. tau : float Smoothing parameter for choice probabilities. Returns ------- smoothed_log_probability : float Simulated Smoothed log probability of choice. """ n_draws, n_choices = draws.shape smoothed_log_probabilities = np.empty(n_draws) smoothed_value_functions = np.empty(n_choices) for i in range(n_draws): for j in range(n_choices): value_function, _ = aggregate_keane_wolpin_utility( wages[j], nonpec[j], continuation_values[j], draws[i, j], delta ) smoothed_value_functions[j] = value_function / tau smoothed_log_probabilities[i] = smoothed_value_functions[choice] - _logsumexp( smoothed_value_functions ) smoothed_log_prob = _logsumexp(smoothed_log_probabilities) - np.log(n_draws) smoothed_log_probability[0] = smoothed_log_prob
[docs]def _process_estimation_data(df, state_space, optim_paras, options): """Process estimation data. All necessary objects for :func:`_internal_log_like_obs` dependent on the data are produced. Some objects have to be repeated for each type which is a desirable format for the estimation where every observations is weighted by type probabilities. Parameters ---------- df : pandas.DataFrame The DataFrame which contains the data used for estimation. The DataFrame contains individual identifiers, periods, experiences, lagged choices, choices in current period, the wage and other observed data. indexer : numpy.ndarray Indexer for the core state space. optim_paras : dict options : dict Returns ------- choices : numpy.ndarray Array with shape (n_observations, n_types) where information is only repeated over the second axis. idx_indiv_first_obs : numpy.ndarray Array with shape (n_individuals,) containing indices for the first observations of each individual. indices : numpy.ndarray Array with shape (n_observations, n_types) containing indices for states which correspond to observations. log_wages_observed : numpy.ndarray Array with shape (n_observations, n_types) containing clipped log wages. type_covariates : numpy.ndarray Array with shape (n_individuals, n_type_covariates) containing covariates to predict probabilities for each type. """ n_types = optim_paras["n_types"] col_dtype = generate_column_dtype_dict_for_estimation(optim_paras) df = ( df.sort_index()[list(col_dtype)[2:]] .rename(columns=rename_labels_to_internal) .rename_axis(index=rename_labels_to_internal) ) df = convert_labeled_variables_to_codes(df, optim_paras) # Duplicate observations for each type. if n_types >= 2: df = pd.concat([df.copy().assign(type=i) for i in range(n_types)]) df["dense_key"], df["core_index"] = map_observations_to_states( df, state_space, optim_paras ) # For the estimation, log wages are needed with shape (n_observations, n_types). df["log_wage"] = np.log(np.clip(df.wage.to_numpy(), 1 / MAX_FLOAT, MAX_FLOAT)) df = df.drop(columns="wage") # For the type covariates, we only need the first observation of each individual. if n_types >= 2: initial_states = df.query("period == 0").copy() type_covariates = compute_covariates( initial_states, options["covariates_core"], raise_errors=False ) type_covariates = type_covariates.apply(downcast_to_smallest_dtype) else: type_covariates = None return df, type_covariates
[docs]def _update_optim_paras_with_initial_experience_levels(optim_paras, df): """Adjust the initial experience levels in optim_paras from the data.""" for choice in optim_paras["choices_w_exp"]: # Adjust initial experience levels for all choices with experiences. init_exp_data = np.sort( df.query("Period == 0")[f"Experience_{choice.title()}"].unique() ) init_exp_params = np.array(list(optim_paras["choices"][choice]["start"])) if not np.array_equal(init_exp_data, init_exp_params): warnings.warn( f"The initial experience(s) for choice '{choice}' differs between data," f" {init_exp_data}, and parameters, {init_exp_params}. The parameters" " are ignored.", category=UserWarning, ) optim_paras["choices"][choice]["start"] = init_exp_data optim_paras = { k: v for k, v in optim_paras.items() if not k.startswith("lagged_choice_") } return optim_paras
[docs]def _create_comparison_plot_data(df, log_type_probabilities, optim_paras): """Create DataFrame for estimagic's comparison plot.""" df = df.copy() df["choice"] = ( df["choice"].replace(dict(enumerate(optim_paras["choices"]))).astype("category") ) # During the likelihood calculation, the log likelihood for missing wages is # substituted with 0. Remove these log likelihoods to get the correct picture. is_wage_missing = df.log_wage.isna() & df.choice.isin(optim_paras["choices_w_wage"]) df.loc[is_wage_missing, "loglike_wage"] = np.nan # Keep the log likelihood and the choice. columns = df.filter(like="loglike").columns.tolist() + ["choice"] df = df[columns] df = df.reset_index().melt(id_vars=["identifier", "period", "choice"]) df = df.astype({"identifier": "uint16", "period": "uint8"}) splitted_label = df.variable.str.split("_", expand=True) df["kind"] = splitted_label[1].astype("category") df = df.drop(columns="variable").dropna() if log_type_probabilities is not None: log_type_probabilities = ( log_type_probabilities.copy() .reset_index() .melt( id_vars=["identifier", "period"], value_vars=range(optim_paras["n_types"]), var_name="type", value_name="log_type_probability", ) ) log_type_probabilities.type = log_type_probabilities.type.astype("category") df = df.append(log_type_probabilities, sort=False) return df
[docs]def _map_choice_codes_to_indices_of_valid_choice_set(choices, choice_set): """Map choice codes to the indices of the valid choice set. Choice codes are numbering all choices going from 0 to `n_choices` - 1. In some dense indices not all choices are available and, thus, arrays like wages have only as many columns as available choices. Therefore, we need to number the available choices from 0 to `n_available_choices` - 1 and replace the old choice codes with the new ones. Examples -------- >>> wages = np.arange(4).reshape(2, 2) >>> choices = np.array([0, 2]) >>> choice_set = (True, False, True) >>> np.choose(choices, wages) Traceback (most recent call last): ... ValueError: invalid entry in choice array >>> new_choices = _map_choice_codes_to_indices_of_valid_choice_set( ... choices, choice_set ... ) >>> np.choose(new_choices, wages) array([0, 3]) """ mapper = {value: i for i, value in enumerate(np.where(choice_set)[0])} sort_idx = np.argsort(list(mapper)) idx = np.searchsorted(list(mapper), choices, sorter=sort_idx) out = np.asarray(list(mapper.values()))[sort_idx][idx] return out