Source code for depp_sfa.depp_sfa

"""
Stochastic Frontier Analysis (SFA) Module.

This module provides the SFA class to estimate production and cost frontiers
using Maximum Likelihood Estimation (MLE) or Bayesian Inference (PyMC).
"""

import math
import logging
import warnings

import numpy as np
import pandas as pd
from scipy.stats import norm
from scipy.optimize import minimize
from scipy.special import log_ndtr
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from statsmodels.tools.numdiff import approx_hess
import pymc as pm

warnings.filterwarnings("ignore", module="arviz")

from . import constant

# Bind constants from the external module
FUN_COST = constant.FUN_COST
FUN_PROD = constant.FUN_PROD
TE_teJ = constant.TE_teJ
TE_te = constant.TE_te
TE_teMod = constant.TE_teMod

# Mute PyMC internal logging to keep the terminal clean
logging.getLogger("pymc").setLevel(logging.ERROR)


[docs] class SFA: """ Stochastic Frontier Analysis (SFA) estimator. This class supports Cross-sectional models (ALS77, BC95), Panel Data models (BC92), and True Effects models (Greene 2005). It provides dual inference backends: Frequentist (MLE) and Bayesian (MCMC via PyMC). :cvar FUN_PROD: Constant indicating a production frontier. :cvar FUN_COST: Constant indicating a cost frontier. :cvar TE_teJ: Constant for Jondrow et al. (1982) efficiency decomposition. :cvar TE_te: Constant for Battese & Coelli (1988) efficiency decomposition. :cvar TE_teMod: Constant for a modified efficiency decomposition. """ FUN_PROD = FUN_PROD FUN_COST = FUN_COST TE_teJ = TE_teJ TE_te = TE_te TE_teMod = TE_teMod def __init__( self, y, x, z=None, id_var=None, time_var=None, fun=FUN_PROD, intercept=True, lamda0=1, method=TE_teJ, form='linear', dummy_indices=None, inference_method='mle', panel_model='bc92', draws=3000, tune=3000, standardize=True ): """ Initialize the SFA model. :param y: Dependent variable (output or cost). :type y: array-like, shape (n_samples,) :param x: Independent variables (inputs). :type x: array-like, shape (n_samples, n_features) :param z: Inefficiency determinants for BC95 models. Defaults to None. :type z: array-like, shape (n_samples, n_z_features), optional :param id_var: Panel group identifier. Defaults to None. :type id_var: array-like, shape (n_samples,), optional :param time_var: Panel time identifier. Defaults to None. :type time_var: array-like, shape (n_samples,), optional :param fun: Type of frontier (FUN_PROD or FUN_COST). Defaults to FUN_PROD. :type fun: int, optional :param intercept: Whether to include an intercept. Defaults to True. :type intercept: bool, optional :param lamda0: Initial guess for lambda in MLE. Defaults to 1. :type lamda0: float, optional :param method: Efficiency decomposition method. Defaults to TE_teJ. :type method: int, optional :param form: Functional form ('linear', 'cobb_douglas', 'translog'). Defaults to 'linear'. :type form: str, optional :param dummy_indices: Column indices in `x` to treat as dummy variables (no log transform). Defaults to None. :type dummy_indices: list of int, optional :param inference_method: 'mle' or 'pymc'. Defaults to 'mle'. :type inference_method: str, optional :param panel_model: 'bc92' or 'greene'. Defaults to 'bc92'. :type panel_model: str, optional :param draws: Number of MCMC draws for PyMC. Defaults to 3000. :type draws: int, optional :param tune: Number of tuning steps for PyMC. Defaults to 3000. :type tune: int, optional :param standardize: Whether to standardize the input variables. Defaults to True. """ self.fun = fun self.intercept = intercept self.lamda0 = lamda0 self.method = method self.form = form self.dummy_indices = dummy_indices if dummy_indices is not None else [] self.inference_method = inference_method.lower() self.panel_model = panel_model.lower() self.draws = draws self.tune = tune self.standardize = standardize # Set error sign based on frontier type (production = 1, cost = -1) self.sign = 1 if self.fun == self.FUN_PROD else -1 self.is_panel = (id_var is not None) and (time_var is not None) self.has_z = z is not None if self.is_panel and self.has_z: raise ValueError( "This class does not support combining " "Z variables (BC95) with Panel data." ) # Convert inputs to numpy arrays y_arr = np.array(y, dtype=float) x_arr = np.array(x, dtype=float) # Apply functional form transformations (e.g., logs, interaction terms) self.y, self.x, self.x_names = self.__transform_data( y_arr, x_arr, self.form, self.dummy_indices, self.standardize ) if self.is_panel: self._setup_panel(id_var, time_var) elif self.has_z: self._setup_z(z) else: self.z = None self.z_names = [] # Internal state tracking self.is_fitted = False self.estimation_method = None self._params = None self._std_err = None self._llf = np.nan self.pymc_trace = None def _setup_panel(self, id_var, time_var): """ Process panel data identifiers and compute maximum time per firm. :param id_var: Array of firm/group identifiers. :param time_var: Array of time periods. """ id_var = np.array(id_var) self.time_array = np.array(time_var, dtype=float) # Create mapping from unique ID to integer index unique_ids = np.unique(id_var) self.num_firms = len(unique_ids) id_map = {val: i for i, val in enumerate(unique_ids)} self.firm_idx = np.array([id_map[val] for val in id_var]) # Compute max time observed for each firm (required for BC92 decay function) self.T_max_per_firm = np.zeros(self.num_firms) for i in range(self.num_firms): mask = self.firm_idx == i self.T_max_per_firm[i] = np.max(self.time_array[mask]) # === GREENE 2005 (TFE) === # If MLE True Fixed Effects, append firm dummies directly to the X matrix if self.panel_model == 'greene' and self.inference_method == 'mle': firm_dummies = pd.get_dummies(id_var, drop_first=True).astype(float).values self.x = np.hstack((self.x, firm_dummies)) dummy_names = [f"Firme_{uid}" for uid in np.unique(id_var)[1:]] self.x_names.extend(dummy_names) def _setup_z(self, z): """ Process environmental/inefficiency variables (Z) for BC95 models. :param z: Array of Z variables. """ z_array = np.array(z, dtype=float) if z_array.ndim == 1: z_array = np.atleast_2d(z_array) if z_array.shape[0] != len(self.y): z_array = z_array.T # Prepend a column of ones for the Z-equation intercept (delta_0) self.z = np.hstack((np.ones((z_array.shape[0], 1)), z_array)) self.z_names = ['delta_0'] + [ f"z{i+1}" for i in range(z_array.shape[1]) ] def __transform_data(self, y, x, form, dummy_indices, standardize): """ Transform raw data into the specified functional form and optionally standardize. :param y: Dependent variable array. :param x: Independent variables array. :param form: 'linear', 'cobb_douglas', or 'translog'. :param dummy_indices: List of indices to exclude from log-transformation and standardization. :param standardize: Whether to standardize continuous variables (mean 0, variance 1). :type standardize: bool :returns: Tuple containing transformed y, transformed x, and variable names. :rtype: tuple """ x_2d = np.atleast_2d(x) if x.ndim == 1 else x n_obs, n_vars = x_2d.shape base_names = [f"x{i+1}" for i in range(n_vars)] # Separate continuous variables from dummy variables cont_idx = [i for i in range(n_vars) if i not in dummy_indices] x_cont = x_2d[:, cont_idx] if dummy_indices: x_dummies = x_2d[:, dummy_indices] dummy_names = [f"d_{base_names[i]}" for i in dummy_indices] else: x_dummies = np.empty((n_obs, 0)) dummy_names = [] # Handle transformations if form in ['cobb_douglas', 'translog']: # Enforce strict positivity for logarithmic transformations if np.any(y <= 0) or np.any(x_cont <= 0): raise ValueError( "Continuous variables must be strictly positive " "for log transformation." ) trans_y = np.log(y) trans_x_cont = np.log(x_cont) cont_names = [f"ln_{base_names[i]}" for i in cont_idx] else: trans_y = y trans_x_cont = x_cont cont_names = [base_names[i] for i in cont_idx] final_x_cont = trans_x_cont # Translog: include squared terms and cross-products if form == 'translog': new_x_cols = [trans_x_cont] final_names = list(cont_names) n_cont = len(cont_idx) for i in range(n_cont): for j in range(i, n_cont): if i == j: # Squared terms: 0.5 * ln(xi)^2 col = 0.5 * (trans_x_cont[:, i] ** 2) new_x_cols.append(col.reshape(-1, 1)) final_names.append(f"0.5*{cont_names[i]}^2") else: # Cross-products: ln(xi) * ln(xj) col = trans_x_cont[:, i] * trans_x_cont[:, j] new_x_cols.append(col.reshape(-1, 1)) final_names.append(f"{cont_names[i]}*{cont_names[j]}") final_x_cont = np.hstack(new_x_cols) cont_names = final_names # Apply standardization ONLY to the continuous block if standardize and final_x_cont.shape[1] > 0: scaler = StandardScaler() final_x_cont = scaler.fit_transform(final_x_cont) # Re-append dummy variables at the end if dummy_indices: final_x = np.hstack((final_x_cont, x_dummies)) else: final_x = final_x_cont return trans_y, final_x, cont_names + dummy_names
[docs] def optimize(self): """ Main estimation router. Checks if the model is already fitted. If not, it routes the estimation to the appropriate MLE or PyMC private method based on user configuration. """ if self.is_fitted: return with warnings.catch_warnings(): warnings.simplefilter("ignore") # === BRANCHE MLE === if self.inference_method == 'mle': if self.is_panel: if self.panel_model == 'greene': self.__optimize_mle() self.estimation_method = 'MLE (Greene 2005 TFE)' else: self.__optimize_mle_panel() self.estimation_method = 'MLE (Panel BC92)' else: self.__optimize_mle() self.estimation_method = 'MLE (BC95 Inefficiency Effects)' if self.has_z else 'MLE (Cross-sectional)' # === BRANCHE PyMC === elif self.inference_method == 'pymc': if self.is_panel: if self.panel_model == 'greene': self.__optimize_pymc_greene_tre() self.estimation_method = 'PyMC (Greene 2005 TRE)' else: self.__optimize_pymc_panel() self.estimation_method = 'PyMC (Panel BC92)' else: self.__optimize_pymc_cross() self.estimation_method = 'PyMC (BC95 Inefficiency Effects)' if self.has_z else 'PyMC (Cross-sectional)' self.is_fitted = True
def __optimize_mle(self): """ Maximum Likelihood Estimation for Cross-sectional, Z-models & TFE. Uses OLS to generate starting values, then constructs and minimizes the negative log-likelihood function using the L-BFGS-B algorithm. """ # OLS Initialization reg = LinearRegression(fit_intercept=self.intercept).fit(X=self.x, y=self.y) if self.intercept: beta_init = np.concatenate(([reg.intercept_], reg.coef_)) else: beta_init = reg.coef_ K = len(beta_init) N = len(self.x) y_pred_init = reg.predict(self.x) resid_var = np.var(self.y - y_pred_init) # Setup initial parameters if self.has_z: delta_init = np.zeros(self.z.shape[1]) parm = np.concatenate((beta_init, delta_init, [resid_var, 0.5])) else: lam0 = self.lamda0 gamma_init = (lam0**2) / (1 + lam0**2) parm = np.concatenate((beta_init, [resid_var, gamma_init])) def __loglik(p): # Standard Cross-sectional Likelihood if not self.has_z: beta = p[0:K] sigma2, gamma = p[-2], p[-1] if sigma2 <= 0 or gamma <= 0 or gamma >= 0.9999: return 1e15 y_pred = beta[0] + np.dot(self.x, beta[1:]) if self.intercept else np.dot(self.x, beta) res = self.y - y_pred lam = np.sqrt(gamma / (1 - gamma)) ll = ( -0.5 * N * np.log(2 * math.pi) - 0.5 * N * np.log(sigma2) + np.sum(log_ndtr(-self.sign * res * lam / math.sqrt(sigma2))) - 0.5 * np.sum(res**2) / sigma2 ) return -ll # BC95 Z-Variables Likelihood else: beta = p[0:K] delta = p[K : K + self.z.shape[1]] sigma2, gamma = p[-2], p[-1] if sigma2 <= 0 or gamma <= 0 or gamma >= 0.9999: return 1e15 y_pred = beta[0] + np.dot(self.x, beta[1:]) if self.intercept else np.dot(self.x, beta) eps = self.sign * (self.y - y_pred) mu = np.dot(self.z, delta) sigma_star = np.sqrt(gamma * (1 - gamma) * sigma2) mu_star = (1 - gamma) * mu - gamma * eps ll = ( -0.5 * np.log(sigma2) - 0.5 * ((eps + mu)**2 / sigma2) + log_ndtr(mu_star / sigma_star) - log_ndtr(mu / np.sqrt(gamma * sigma2)) ) return -np.sum(ll) method = 'L-BFGS-B' # Set bounds: gamma must be strictly between 0 and 1, sigma2 must be positive if self.has_z: bounds = ([(None, None)] * K + [(None, None)] * self.z.shape[1] + [(1e-6, None), (1e-6, 0.9999)]) else: bounds = ([(None, None)] * K + [(1e-6, None), (1e-6, 0.9999)]) res = minimize(__loglik, parm, method=method, bounds=bounds) self._params = res.x self._llf = -res.fun try: hess_exact = approx_hess(res.x, __loglik) hess_inv_exact = np.linalg.inv(hess_exact) self._std_err = np.sqrt(np.diag(hess_inv_exact)) except Exception: try: hessian_approx = res.hess_inv.todense() if hasattr(res.hess_inv, 'todense') else res.hess_inv self._std_err = np.sqrt(np.diag(hessian_approx)) except (AttributeError, ValueError): self._std_err = np.full_like(res.x, np.nan) def __optimize_mle_panel(self): """ Frequentist MLE for BC92 (Time-Varying Panel Data). Estimates a global decay parameter (eta) that models the temporal evolution of inefficiency. """ x_mat = np.hstack([np.ones((self.x.shape[0], 1)), self.x]) if self.intercept else self.x reg = LinearRegression(fit_intercept=False).fit(x_mat, self.y) y_pred = reg.predict(x_mat) resid_var = np.var(self.y - y_pred) beta_ols = reg.coef_ def bc92_ll(params): num_k = x_mat.shape[1] beta = params[:num_k] eta = params[num_k] sig2 = params[num_k + 1] gamma = params[num_k + 2] mu = 0.0 if sig2 <= 0 or gamma <= 0 or gamma >= 0.9999: return 1e15 epsilon = self.y - np.dot(x_mat, beta) # BC92 Time decay function: f_it = exp(-eta * (t - T_i)) t_diff = self.time_array - self.T_max_per_firm[self.firm_idx] f_it = np.exp(-eta * t_diff) sig_u2 = gamma * sig2 sig_v2 = (1 - gamma) * sig2 total_ll = 0 # Sum log-likelihood over each firm i for i in range(self.num_firms): mask = self.firm_idx == i eps_i, f_i = epsilon[mask], f_it[mask] num_ti = len(eps_i) sum_f2 = np.sum(f_i**2) sum_f_eps = np.sum(f_i * eps_i) di_term = 1 + (sig_u2 / sig_v2) * sum_f2 mu_star = ((mu * sig_v2 - self.sign * sig_u2 * sum_f_eps) / (sig_v2 + sig_u2 * sum_f2)) sig_star = np.sqrt((sig_u2 * sig_v2) / (sig_v2 + sig_u2 * sum_f2)) ll_i = ( -0.5 * num_ti * np.log(2 * np.pi * sig_v2) - 0.5 * (np.sum(eps_i**2) / sig_v2) - 0.5 * (mu**2 / sig_u2) + 0.5 * (mu_star**2 / sig_star**2) + log_ndtr(mu_star / sig_star) - log_ndtr(mu / np.sqrt(sig_u2)) - 0.5 * np.log(di_term) ) total_ll += ll_i return -total_ll # Grid search over gamma and eta to find robust starting values best_ll = float('inf') best_start = None for g_test in np.linspace(0.1, 0.9, 9): for e_test in [-0.05, 0.0, 0.05]: test_params = np.concatenate([beta_ols, [e_test, resid_var, g_test]]) current_ll = bc92_ll(test_params) if current_ll < best_ll: best_ll = current_ll best_start = test_params bounds = ([(None, None)] * x_mat.shape[1] + [(None, None), (1e-6, None), (1e-6, 0.9999)]) res = minimize(bc92_ll, best_start, method='L-BFGS-B', bounds=bounds, options={'ftol': 1e-12, 'gtol': 1e-8}) self._params = res.x self._llf = -res.fun try: hess_exact = approx_hess(res.x, bc92_ll) hess_inv_exact = np.linalg.inv(hess_exact) self._std_err = np.sqrt(np.diag(hess_inv_exact)) except Exception as e: warnings.warn(f"Hessian inversion failed: {e}. Falling back to BFGS approximation.") try: h_approx = res.hess_inv.todense() if hasattr(res.hess_inv, 'todense') else res.hess_inv self._std_err = np.sqrt(np.diag(h_approx)) except: self._std_err = np.full_like(res.x, np.nan) def __optimize_pymc_cross(self): """ Bayesian Estimation via PyMC for Cross-sectional & BC95 Models. Uses uninformative/weakly informative priors to define the Bayesian network. Samples from the posterior distribution using NUTS (No-U-Turn Sampler). """ with pm.Model() as model: # Priors for the frontier parameters beta = pm.Normal('beta', mu=0, sigma=3, shape=len(self.x[0])) if self.intercept: beta0 = pm.Normal('beta0', mu=0, sigma=5) mu_y = beta0 + pm.math.dot(self.x, beta) else: mu_y = pm.math.dot(self.x, beta) # Priors for error variances sigma_v = pm.HalfNormal('sigma_v', sigma=1) sigma_u = pm.HalfNormal('sigma_u', sigma=1) # Define inefficiency distribution (U) if self.has_z: delta = pm.Normal('delta', mu=0, sigma=3, shape=self.z.shape[1]) mu_u = pm.math.dot(self.z, delta) # Non-Centered Parameterization for Truncated Normal U_raw = pm.TruncatedNormal('U_raw', mu=0, sigma=1, lower=0, shape=self.x.shape[0]) U = pm.Deterministic('U', mu_u + U_raw * sigma_u) else: # Non-Centered Parameterization for Half-Normal U_raw = pm.HalfNormal('U_raw', sigma=1, shape=self.x.shape[0]) U = pm.Deterministic('U', U_raw * sigma_u) # Deterministic calculation of Technical Efficiency pm.Deterministic('TE', pm.math.exp(-U)) # Compose final error term based on frontier orientation mu_final = mu_y - U if self.sign == 1 else mu_y + U pm.Normal('Y_obs', mu=mu_final, sigma=sigma_v, observed=self.y) trace = pm.sample(draws=self.draws, tune=self.tune, target_accept=0.99, progressbar=True, return_inferencedata=True) self.__extract_pymc_params(trace, model_type='cross') def __optimize_pymc_panel(self): """ Bayesian Estimation via PyMC for Panel Data (BC92). Incorporates the temporal decay parameter (eta) into the MCMC sampling. """ with pm.Model() as model: beta = pm.Normal('beta', mu=0, sigma=3, shape=len(self.x[0])) if self.intercept: beta0 = pm.Normal('beta0', mu=0, sigma=5) mu_y = beta0 + pm.math.dot(self.x, beta) else: mu_y = pm.math.dot(self.x, beta) sigma_v = pm.HalfNormal('sigma_v', sigma=1) sigma_u = pm.HalfNormal('sigma_u', sigma=1) mu = pm.Normal('mu', mu=0, sigma=1) eta = pm.Normal('eta', mu=0, sigma=0.2) # Base inefficiency term per firm (Non-Centered Parameterization) U_raw = pm.TruncatedNormal('U_raw', mu=0, sigma=1, lower=0, shape=self.num_firms) U_i = pm.Deterministic('U_i', mu + U_raw * sigma_u) # BC92 exponential decay time_diff = self.time_array - self.T_max_per_firm[self.firm_idx] decay = pm.math.exp(-eta * time_diff) U_it = pm.Deterministic('U_it', U_i[self.firm_idx] * decay) pm.Deterministic('TE', pm.math.exp(-U_it)) mu_final = mu_y - U_it if self.sign == 1 else mu_y + U_it pm.Normal('Y_obs', mu=mu_final, sigma=sigma_v, observed=self.y) trace = pm.sample(draws=self.draws, tune=self.tune, target_accept=0.99, progressbar=True, return_inferencedata=True) self.__extract_pymc_params(trace, model_type='panel') def __optimize_pymc_greene_tre(self): """ Bayesian Estimation via PyMC for Greene 2005 True Random Effects (TRE). Separates structural unobserved heterogeneity (alpha_i) from pure managerial inefficiency (U_it). """ with pm.Model() as model: beta = pm.Normal('beta', mu=0, sigma=5, shape=len(self.x[0])) # Hierarchical structure for Firm-specific Random Effects mu_alpha = pm.Normal('mu_alpha', mu=0, sigma=5) sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=2) alpha_offset = pm.Normal('alpha_offset', mu=0, sigma=1, shape=self.num_firms) alpha_i = pm.Deterministic('alpha_i', mu_alpha + alpha_offset * sigma_alpha) mu_y = alpha_i[self.firm_idx] + pm.math.dot(self.x, beta) sigma_u = pm.HalfNormal('sigma_u', sigma=2) U_raw = pm.HalfNormal('U_raw', sigma=1, shape=len(self.y)) U_it = pm.Deterministic('U_it', U_raw * sigma_u) sigma_v = pm.HalfNormal('sigma_v', sigma=2) pm.Deterministic('TE', pm.math.exp(-U_it)) mu_final = mu_y - U_it if self.sign == 1 else mu_y + U_it pm.Normal('Y_obs', mu=mu_final, sigma=sigma_v, observed=self.y) trace = pm.sample( draws=self.draws, tune=self.tune, target_accept=0.99, progressbar=True, return_inferencedata=True ) self.__extract_pymc_params(trace, model_type='tre') def __extract_pymc_params(self, trace, model_type): """ Standardize PyMC posterior output to perfectly match the MLE return structure. Extracts posterior means and standard deviations to populate `self._params` and `self._std_err`. :param trace: The PyMC InferenceData object. :param model_type: Str indicating 'cross', 'panel', or 'tre'. """ self.pymc_trace = trace post = trace.posterior beta_m = post['beta'].mean(dim=['chain', 'draw']).values beta_s = post['beta'].std(dim=['chain', 'draw']).values # Handle intercept if self.intercept and model_type != 'tre': betas = np.concatenate(([post['beta0'].mean().values], beta_m)) betas_se = np.concatenate(([post['beta0'].std().values], beta_s)) else: betas, betas_se = beta_m, beta_s if model_type == 'tre': mu_alpha_m, mu_alpha_s = post['mu_alpha'].mean().values, post['mu_alpha'].std().values s_alpha_m, s_alpha_s = post['sigma_alpha'].mean().values, post['sigma_alpha'].std().values su_m, su_s = post['sigma_u'].mean().values, post['sigma_u'].std().values sv_m, sv_s = post['sigma_v'].mean().values, post['sigma_v'].std().values self._params = np.concatenate((betas, [mu_alpha_m, s_alpha_m, su_m, sv_m])) self._std_err = np.concatenate((betas_se, [mu_alpha_s, s_alpha_s, su_s, sv_s])) else: # Reconstruct sigma2 and gamma from Bayesian sigma_u and sigma_v sigma_u_post = post['sigma_u'] sigma_v_post = post['sigma_v'] sigma2_post = sigma_u_post**2 + sigma_v_post**2 gamma_post = (sigma_u_post**2) / sigma2_post sigma2_m, sigma2_s = sigma2_post.mean().values, sigma2_post.std().values gamma_m, gamma_s = gamma_post.mean().values, gamma_post.std().values if model_type == 'panel': eta_m, eta_s = post['eta'].mean().values, post['eta'].std().values mu_m, mu_s = post['mu'].mean().values, post['mu'].std().values self._params = np.concatenate((betas, [mu_m, eta_m, sigma2_m, gamma_m])) self._std_err = np.concatenate((betas_se, [mu_s, eta_s, sigma2_s, gamma_s])) elif self.has_z: delta_m = post['delta'].mean(dim=['chain', 'draw']).values delta_s = post['delta'].std(dim=['chain', 'draw']).values self._params = np.concatenate((betas, delta_m, [sigma2_m, gamma_m])) self._std_err = np.concatenate((betas_se, delta_s, [sigma2_s, gamma_s])) else: self._params = np.concatenate((betas, [sigma2_m, gamma_m])) self._std_err = np.concatenate((betas_se, [sigma2_s, gamma_s])) # LLF does not strictly map to Bayesian MCMC in the same way self._llf = np.nan
[docs] def get_beta(self): """ Get the estimated coefficients for the frontier equation. :returns: Array of estimated beta coefficients. :rtype: numpy.ndarray """ self.optimize() K = len(self.x[0]) + (1 if self.intercept else 0) return self._params[0:K]
[docs] def get_residuals(self): """ Get the model residuals (epsilon = y - X*beta). :returns: Array of residuals. :rtype: numpy.ndarray """ self.optimize() beta = self.get_beta() if self.intercept: return self.y - beta[0] - np.dot(self.x, beta[1:]) return self.y - np.dot(self.x, beta)
[docs] def get_lambda(self): """ Get the ratio of standard deviations (lambda = sigma_u / sigma_v). :returns: Lambda value. :rtype: float """ self.optimize() gamma = self._params[-1] return np.sqrt(gamma / (1 - gamma))
[docs] def get_sigma2(self): """ Get the total variance of the composite error (sigma^2). :returns: Sigma squared value. :rtype: float """ self.optimize() return self._params[-2]
def __teJ(self): """ Calculate Technical Efficiency using Jondrow et al. (1982). E[exp(-u) | e] """ lam = self.get_lambda() self.ustar = -self.sign * self.get_residuals() * (lam**2 / (1+lam**2)) self.sstar = (lam / (1 + lam**2)) * math.sqrt(self.get_sigma2()) ratio = self.ustar / self.sstar log_term = norm.logpdf(ratio) - log_ndtr(ratio) return np.exp(-self.ustar - self.sstar * np.exp(log_term)) def __te(self): """ Calculate Technical Efficiency using Battese & Coelli (1988). """ lam = self.get_lambda() self.ustar = -self.sign * self.get_residuals() * (lam**2 / (1+lam**2)) self.sstar = (lam / (1 + lam**2)) * math.sqrt(self.get_sigma2()) ratio = self.ustar / self.sstar log_term = log_ndtr(ratio - self.sstar) - log_ndtr(ratio) return np.exp(log_term + (self.sstar**2 / 2) - self.ustar) def __teMod(self): """ Calculate Technical Efficiency using the modified approach. """ lam = self.get_lambda() self.ustar = -self.sign * self.get_residuals() * (lam**2 / (1+lam**2)) return np.exp(np.minimum(0, -self.ustar))
[docs] def get_technical_efficiency(self): """ Get the technical efficiency scores for all observations. If Bayesian (PyMC) was used, returns the mean of the posterior `TE` distribution. Otherwise, applies the user-selected frequentist decomposition. :returns: Array of efficiency scores bounded between 0 and 1. :rtype: numpy.ndarray """ self.optimize() if self.estimation_method and 'PyMC' in self.estimation_method: return self.pymc_trace.posterior['TE'].mean(dim=['chain', 'draw']).values if self.method == self.TE_teJ: return self.__teJ() elif self.method == self.TE_te: return self.__te() elif self.method == self.TE_teMod: return self.__teMod() else: raise ValueError("Undefined decomposition technique.")
[docs] def summary(self): """ Print a formatted summary table of the estimation results. Outputs coefficients, standard errors, z-values, p-values, and diagnostic information (ESS checks for Bayesian models). """ self.optimize() ess_map = {} # Diagnostic check for MCMC to warn users of poor convergence if self.inference_method == 'pymc' and self.pymc_trace is not None: import arviz as az diag = az.summary(self.pymc_trace) for idx in diag.index: ess_map[idx] = diag.loc[idx, 'ess_bulk'] su_ess = ess_map.get('sigma_u', 9999) sv_ess = ess_map.get('sigma_v', 9999) if su_ess < 400 or sv_ess < 400: ess_map['sigma2'], ess_map['gamma'] = 0, 0 names = (['(Intercept)'] + list(self.x_names)) if self.intercept else list(self.x_names) # Build parameter names mapping based on the active model if self.is_panel and self.panel_model == 'greene': if 'pymc' in self.inference_method: names = list(self.x_names) + ['mu_alpha', 'sigma_alpha', 'sigma_u', 'sigma_v', 'sigma2', 'gamma'] else: names += ['sigma_u', 'sigma_v', 'sigma2', 'gamma'] elif self.is_panel: if 'pymc' in self.inference_method: names += ['mu', 'eta', 'sigma_u', 'sigma_v', 'sigma2', 'gamma'] else: names += ['eta', 'sigma_u', 'sigma_v', 'sigma2', 'gamma'] elif self.has_z: names += self.z_names + ['sigma_u', 'sigma_v', 'sigma2', 'gamma'] else: names += ['sigma_u', 'sigma_v', 'sigma2', 'gamma'] s2 = self._params[-2] gam = self._params[-1] s2_se = self._std_err[-2] gam_se = self._std_err[-1] # Back out individual standard deviations for display su_val = np.sqrt(gam * s2) if gam * s2 > 0 else 0 sv_val = np.sqrt((1 - gam) * s2) if (1 - gam) * s2 > 0 else 0 display_params = [] display_std = [] num_main_params = len(self._params) - 2 for i, name in enumerate(names): if i < num_main_params: display_params.append(self._params[i]) display_std.append(self._std_err[i]) elif name == 'sigma_u': display_params.append(su_val) display_std.append(np.nan) elif name == 'sigma_v': display_params.append(sv_val) display_std.append(np.nan) elif name == 'sigma2': display_params.append(s2) display_std.append(s2_se) elif name == 'gamma': display_params.append(gam) display_std.append(gam_se) else: display_params.append(np.nan) display_std.append(np.nan) display_params = np.array(display_params) display_std = np.array(display_std) # Compute Frequentist Z-statistics and P-values with np.errstate(divide='ignore', invalid='ignore'): z_values = display_params / display_std p_values = 2 * norm.sf(np.abs(z_values)) rows = [] for i, name in enumerate(names): current_ess = 9999 if self.inference_method == 'pymc': if name == '(Intercept)': key = 'beta0' elif name in self.x_names: key = f"beta[{list(self.x_names).index(name)}]" else: key = name current_ess = ess_map.get(key, 9999) # Mask out statistics if Effective Sample Size is dangerously low if current_ess < 400: rows.append({ 'Estimate': np.round(display_params[i], 5), 'Std. Error': 'Low ESS', 'z value': '---', 'Pr(>|z|)': '---', 'Sig.': '' }) else: p = p_values[i] std = display_std[i] z = z_values[i] star = ('***' if p < 0.01 else '**' if p < 0.05 else '*' if p < 0.1 else '') if not np.isnan(p) else '' rows.append({ 'Estimate': np.round(display_params[i], 5), 'Std. Error': np.round(std, 6) if not np.isnan(std) else '---', 'z value': np.round(z, 3) if not np.isnan(z) else '---', 'Pr(>|z|)': np.round(p, 4) if not np.isnan(p) else '---', 'Sig.': star }) res_table = pd.DataFrame(rows, index=names) print(f"\nStochastic Frontier Analysis ({self.estimation_method})") print("=" * 75) print(res_table.to_string(na_rep='NaN')) print("-" * 75) print("Signif. codes: 0 '***' 0.01 '**' 0.05 '*' 0.1 ' ' 1") if not np.isnan(self._llf): print(f"Log-Likelihood: {self._llf:.5f}") print("=" * 75) if self.standardize: std_vars = [name for name in self.x_names if not name.startswith('d_') and 'contract_type' not in name] print("\nInterpretation Note:") print("The following continuous variables have been standardized (mean=0, std=1):") import textwrap wrapped_vars = textwrap.fill(", ".join(std_vars), width=76, initial_indent=" ", subsequent_indent=" ") print(wrapped_vars) print("\n -> A coefficient of \u03B2 implies that a 1 standard deviation increase") print(" in the explanatory variable results in a \u03B2 unit change in the") print(" dependent variable (e.g., log_cost).") print("\n")