Source code for bayesreg

from __future__ import print_function
from __future__ import division

import numpy as np
from scipy import optimize, linalg
from scipy.linalg import LinAlgError


[docs] class BLR: """Bayesian linear regression Estimation and prediction of Bayesian linear regression models Basic usage:: B = BLR() hyp = B.estimate(hyp0, X, y) ys,s2 = B.predict(hyp, X, y, Xs) where the variables are :param hyp: vector of hyperparmaters. :param X: N x D data array :param y: 1D Array of targets (length N) :param Xs: Nte x D array of test cases :param hyp0: starting estimates for hyperparameter optimisation :returns: * ys - predictive mean * s2 - predictive variance The hyperparameters are:: hyp = ( log(beta), log(alpha) ) # hyp is a list or numpy array The implementation and notation mostly follows Bishop (2006). The hyperparameter beta is the noise precision and alpha is the precision over lengthscale parameters. This can be either a scalar variable (a common lengthscale for all input variables), or a vector of length D (a different lengthscale for each input variable, derived using an automatic relevance determination formulation). These are estimated using conjugate gradient optimisation of the marginal likelihood. Reference: Bishop (2006) Pattern Recognition and Machine Learning, Springer Written by A. Marquand """ def __init__(self, **kwargs): # parse arguments n_iter = kwargs.get('n_iter', 100) tol = kwargs.get('tol', 1e-3) verbose = kwargs.get('verbose', False) var_groups = kwargs.get('var_groups', None) var_covariates = kwargs.get('var_covariates', None) warp = kwargs.get('warp', None) warp_reparam = kwargs.get('warp_reparam', False) if var_groups is not None and var_covariates is not None: raise ValueError( "var_covariates and var_groups cannot both be used") # basic parameters self.hyp = np.nan self.nlZ = np.nan self.tol = tol # not used at present self.n_iter = n_iter self.verbose = verbose self.var_groups = var_groups if var_covariates is not None: self.hetero_var = True else: self.hetero_var = False if self.var_groups is not None: self.var_ids = set(self.var_groups) self.var_ids = sorted(list(self.var_ids)) # set up warped likelihood if verbose: print('warp:', warp, 'warp_reparam:', warp_reparam) if warp is None: self.warp = None self.n_warp_param = 0 else: self.warp = warp self.n_warp_param = warp.get_n_params() self.warp_reparam = warp_reparam self.gamma = None def _parse_hyps(self, hyp, X, Xv=None): """ Parse hyperparameters into noise precision, lengthscale precision and lengthscale parameters. :param hyp: hyperparameter vector :param X: covariates :param Xv: covariates for heteroskedastic noise """ N = X.shape[0] # noise precision if Xv is not None: if len(Xv.shape) == 1: Dv = 1 Xv = Xv[:, np.newaxis] else: Dv = Xv.shape[1] w_d = np.asarray(hyp[0:Dv]) beta = np.exp(Xv.dot(w_d)) n_lik_param = len(w_d) elif self.var_groups is not None: beta = np.exp(hyp[0:len(self.var_ids)]) n_lik_param = len(beta) else: beta = np.asarray([np.exp(hyp[0])]) n_lik_param = len(beta) # parameters for warping the likelihood function if self.warp is not None: gamma = hyp[n_lik_param:(n_lik_param + self.n_warp_param)] n_lik_param += self.n_warp_param else: gamma = None # precision for the coefficients if isinstance(beta, list) or type(beta) is np.ndarray: alpha = np.exp(hyp[n_lik_param:]) else: alpha = np.exp(hyp[1:]) # reparameterise the warp (WarpSinArcsinh only) if self.warp is not None and self.warp_reparam: delta = np.exp(gamma[1]) beta = beta/(delta**2) # Create precision matrix from noise precision if Xv is not None: self.lambda_n_vec = beta elif self.var_groups is not None: beta_all = np.ones(N) for v in range(len(self.var_ids)): beta_all[self.var_groups == self.var_ids[v]] = beta[v] self.lambda_n_vec = beta_all else: self.lambda_n_vec = np.ones(N)*beta return beta, alpha, gamma
[docs] def post(self, hyp, X, y, Xv=None): """ Generic function to compute posterior distribution. This function will save the posterior mean and precision matrix as self.m and self.A and will also update internal parameters (e.g. N, D and the prior covariance (Sigma_a) and precision (Lambda_a). :param hyp: hyperparameter vector :param X: covariates :param y: responses :param Xv: covariates for heteroskedastic noise """ N = X.shape[0] if len(X.shape) == 1: D = 1 else: D = X.shape[1] if (hyp == self.hyp).all() and hasattr(self, 'N'): print("hyperparameters have not changed, exiting") return beta, alpha, gamma = self._parse_hyps(hyp, X, Xv) if self.verbose: print("estimating posterior ... | hyp=", hyp) # prior variance if len(alpha) == 1 or len(alpha) == D: self.Sigma_a = np.diag(np.ones(D))/alpha self.Lambda_a = np.diag(np.ones(D))*alpha else: raise ValueError("hyperparameter vector has invalid length") # compute posterior precision and mean # this is equivalent to the following operation but makes much more # efficient use of memory by avoiding the need to store Lambda_n # # self.A = X.T.dot(self.Lambda_n).dot(X) + self.Lambda_a # self.m = linalg.solve(self.A, X.T, # check_finite=False).dot(self.Lambda_n).dot(y) XtLambda_n = X.T*self.lambda_n_vec self.A = XtLambda_n.dot(X) + self.Lambda_a invAXt = linalg.solve(self.A, X.T, check_finite=False) self.m = (invAXt*self.lambda_n_vec).dot(y) # save stuff self.N = N self.D = D self.hyp = hyp
[docs] def loglik(self, hyp, X, y, Xv=None): """ Function to compute compute log (marginal) likelihood """ # hyperparameters (alpha not needed) beta, alpha, gamma = self._parse_hyps(hyp, X, Xv) # warp the likelihood? if self.warp is not None: if self.verbose: print('warping input...') y_unwarped = y y = self.warp.f(y, gamma) # load posterior and prior covariance if (hyp != self.hyp).any() or not (hasattr(self, 'A')): try: self.post(hyp, X, y, Xv) except ValueError: print("Warning: Estimation of posterior distribution failed") nlZ = 1/np.finfo(float).eps return nlZ try: # compute the log determinants in a numerically stable way logdetA = 2*sum(np.log(np.diag(np.linalg.cholesky(self.A)))) except (ValueError, LinAlgError): print("Warning: Estimation of posterior distribution failed") nlZ = 1/np.finfo(float).eps return nlZ logdetSigma_a = sum(np.log(np.diag(self.Sigma_a))) # diagonal logdetSigma_n = sum(np.log(1/self.lambda_n_vec)) # compute negative marginal log likelihood X_y_t_sLambda_n = (y-X.dot(self.m))*np.sqrt(self.lambda_n_vec) nlZ = -0.5 * (-self.N*np.log(2*np.pi) - logdetSigma_n - logdetSigma_a - X_y_t_sLambda_n.T.dot(X_y_t_sLambda_n) - self.m.T.dot(self.Lambda_a).dot(self.m) - logdetA ) if self.warp is not None: # add in the Jacobian nlZ = nlZ - sum(np.log(self.warp.df(y_unwarped, gamma))) # make sure the output is finite to stop the minimizer getting upset if not np.isfinite(nlZ): nlZ = 1/np.finfo(float).eps if self.verbose: print("nlZ= ", nlZ, " | hyp=", hyp) self.nlZ = nlZ return nlZ
[docs] def penalized_loglik(self, hyp, X, y, Xv=None, l=0.1, norm='L1'): """ Function to compute the penalized log (marginal) likelihood :param hyp: hyperparameter vector :param X: covariates :param y: responses :param Xv: covariates for heteroskedastic noise :param l: regularisation penalty :param norm: type of regulariser (L1 or L2) """ if norm.lower() == 'l1': L = self.loglik(hyp, X, y, Xv) + l * sum(abs(hyp)) elif norm.lower() == 'l2': L = self.loglik(hyp, X, y, Xv) + l * sum(np.sqrt(hyp**2)) else: print("Requested penalty not recognized, choose between 'L1' or 'L2'.") return L
[docs] def dloglik(self, hyp, X, y, Xv=None): """ Function to compute derivatives """ # hyperparameters beta, alpha, gamma = self._parse_hyps(hyp, X, Xv) if self.warp is not None: raise ValueError('optimization with derivatives is not yet ' + 'supported for warped liklihood') # load posterior and prior covariance if (hyp != self.hyp).any() or not (hasattr(self, 'A')): try: self.post(hyp, X, y, Xv) except ValueError: print("Warning: Estimation of posterior distribution failed") dnlZ = np.sign(self.dnlZ) / np.finfo(float).eps return dnlZ # precompute re-used quantities to maximise speed # todo: revise implementation to use Cholesky throughout # that would remove the need to explicitly compute the inverse S = np.linalg.inv(self.A) # posterior covariance SX = S.dot(X.T) XLn = X.T*self.lambda_n_vec # = X.T.dot(self.Lambda_n) XLny = XLn.dot(y) SXLny = S.dot(XLny) XLnXm = XLn.dot(X).dot(self.m) # initialise derivatives dnlZ = np.zeros(hyp.shape) dnl2 = np.zeros(hyp.shape) # noise precision parameter(s) for i in range(0, len(beta)): # first compute derivative of Lambda_n with respect to beta dL_n_vec = np.zeros(self.N) if self.var_groups is None: dL_n_vec = np.ones(self.N) else: dL_n_vec[np.where(self.var_groups == self.var_ids[i])[0]] = 1 dLambda_n = np.diag(dL_n_vec) # compute quantities used multiple times XdLnX = X.T.dot(dLambda_n).dot(X) dA = XdLnX # derivative of posterior parameters with respect to beta b = -S.dot(dA).dot(SXLny) + SX.dot(dLambda_n).dot(y) # compute np.trace(self.Sigma_n.dot(dLambda_n)) efficiently trSigma_ndLambda_n = sum((1/self.lambda_n_vec)*np.diag(dLambda_n)) # compute y.T.dot(Lambda_n) efficiently ytLn = (y*self.lambda_n_vec).T # compute derivatives dnlZ[i] = - (0.5 * trSigma_ndLambda_n - 0.5 * y.dot(dLambda_n).dot(y) + y.dot(dLambda_n).dot(X).dot(self.m) + ytLn.dot(X).dot(b) - 0.5 * self.m.T.dot(XdLnX).dot(self.m) - b.T.dot(XLnXm) - b.T.dot(self.Lambda_a).dot(self.m) - 0.5 * np.trace(S.dot(dA)) ) * beta[i] # scaling parameter(s) for i in range(0, len(alpha)): # first compute derivatives with respect to alpha if len(alpha) == self.D: # are we using ARD? dLambda_a = np.zeros((self.D, self.D)) dLambda_a[i, i] = 1 else: dLambda_a = np.eye(self.D) F = dLambda_a c = -S.dot(F).dot(SXLny) # compute np.trace(self.Sigma_a.dot(dLambda_a)) efficiently trSigma_adLambda_a = sum(np.diag(self.Sigma_a)*np.diag(dLambda_a)) dnlZ[i+len(beta)] = -(0.5 * trSigma_adLambda_a + XLny.T.dot(c) - c.T.dot(XLnXm) - c.T.dot(self.Lambda_a).dot(self.m) - 0.5 * self.m.T.dot(F).dot(self.m) - 0.5*np.trace(linalg.solve(self.A, F)) ) * alpha[i] # make sure the gradient is finite to stop the minimizer getting upset if not all(np.isfinite(dnlZ)): bad = np.where(np.logical_not(np.isfinite(dnlZ))) for b in bad: dnlZ[b] = np.sign(self.dnlZ[b]) / np.finfo(float).eps if self.verbose: print("dnlZ= ", dnlZ, " | hyp=", hyp) self.dnlZ = dnlZ return dnlZ
# model estimation (optimization)
[docs] def estimate(self, hyp0, X, y, **kwargs): """ Function to estimate the model :param hyp: hyperparameter vector :param X: covariates :param y: responses :param optimizer: optimisation algorithm ('cg','powell','nelder-mead','l0bfgs-b') """ optimizer = kwargs.get('optimizer', 'cg') # covariates for heteroskedastic noise Xv = kwargs.get('var_covariates', None) # options for l-bfgs-b l = float(kwargs.get('l', 0.1)) epsilon = float(kwargs.get('epsilon', 0.1)) norm = kwargs.get('norm', 'l2') if optimizer.lower() == 'cg': # conjugate gradients out = optimize.fmin_cg(self.loglik, hyp0, self.dloglik, (X, y, Xv), disp=True, gtol=self.tol, maxiter=self.n_iter, full_output=1) elif optimizer.lower() == 'powell': # Powell's method out = optimize.fmin_powell(self.loglik, hyp0, (X, y, Xv), full_output=1) elif optimizer.lower() == 'nelder-mead': out = optimize.fmin(self.loglik, hyp0, (X, y, Xv), full_output=1) elif optimizer.lower() == 'l-bfgs-b': all_hyp_i = [hyp0] def store(X): hyp = X all_hyp_i.append(hyp) try: out = optimize.fmin_l_bfgs_b(self.penalized_loglik, x0=hyp0, args=(X, y, Xv, l, norm), approx_grad=True, epsilon=epsilon, callback=store) # If the matrix becomes singular restart at last found hyp except np.linalg.LinAlgError: print( f'Restarting estimation at hyp = {all_hyp_i[-1]}, due to *** numpy.linalg.LinAlgError: Matrix is singular.') out = optimize.fmin_l_bfgs_b(self.penalized_loglik, x0=all_hyp_i[-1], args=(X, y, Xv, l, norm), approx_grad=True, epsilon=epsilon) else: raise ValueError("unknown optimizer") self.hyp = out[0] self.nlZ = out[1] self.optimizer = optimizer return self.hyp
[docs] def predict(self, hyp, X, y, Xs, var_groups_test=None, var_covariates_test=None, **kwargs): """ Function to make predictions from the model :param hyp: hyperparameter vector :param X: covariates for training data :param y: responses for training data :param Xs: covariates for test data :param var_covariates_test: test covariates for heteroskedastic noise This always returns Gaussian predictions, i.e. :returns: * ys - predictive mean * s2 - predictive variance """ Xvs = var_covariates_test if Xvs is not None and len(Xvs.shape) == 1: Xvs = Xvs[:, np.newaxis] if X is None or y is None: # set dummy hyperparameters beta, alpha, gamma = self._parse_hyps( hyp, np.zeros((self.N, self.D)), Xvs) else: # set hyperparameters beta, alpha, gamma = self._parse_hyps(hyp, X, Xvs) # do we need to re-estimate the posterior? if (hyp != self.hyp).any() or not (hasattr(self, 'A')): raise ValueError('posterior not properly estimated') N_test = Xs.shape[0] ys = Xs.dot(self.m) if self.var_groups is not None: if len(var_groups_test) != N_test: raise ValueError('Invalid variance groups for test') # separate variance groups s2n = np.ones(N_test) for v in range(len(self.var_ids)): s2n[var_groups_test == self.var_ids[v]] = 1/beta[v] else: s2n = 1/beta # compute xs.dot(S).dot(xs.T) avoiding computing off-diagonal entries s2 = s2n + np.sum(Xs*linalg.solve(self.A, Xs.T).T, axis=1) return ys, s2
[docs] def predict_and_adjust(self, hyp, X, y, Xs=None, ys=None, var_groups_test=None, var_groups_adapt=None, **kwargs): """ Function to transfer the model to a new site. This is done by first making predictions on the adaptation data given by X, adjusting by the residuals with respect to y. :param hyp: hyperparameter vector :param X: covariates for adaptation (i.e. calibration) data :param y: responses for adaptation data :param Xs: covariate data (for which predictions should be adjusted) :param ys: true response variables (to be adjusted) :param var_groups_test: variance groups (e.g. sites) for test data :param var_groups_adapt: variance groups for adaptation data There are two possible ways of using this function, depending on whether ys or Xs is specified If ys is specified, this is applied directly to the data, which is assumed to be in the input space (i.e. not warped). In this case the adjusted true data points are returned in the same space Alternatively, Xs is specified, then the predictions are made and adjusted. In this case the predictive variance are returned in the warped (i.e. Gaussian) space. This function needs to know which sites are associated with which data points, which provided by var_groups_xxx, which is a list or array of scalar ids . """ if ys is None: if Xs is None: raise ValueError('Either ys or Xs must be specified') else: N = Xs.shape[0] else: if len(ys.shape) < 1: raise ValueError('ys is specified but has insufficent length') N = ys.shape[0] if var_groups_test is None: var_groups_test = np.ones(N) var_groups_adapt = np.ones(X.shape[0]) ys_out = np.zeros(N) s2_out = np.zeros(N) for g in np.unique(var_groups_test): idx_s = var_groups_test == g idx_a = var_groups_adapt == g if sum(idx_a) < 2: raise ValueError( 'Insufficient adaptation data to estimate variance') # Get predictions from old model on new data X ys_ref, s2_ref = self.predict(hyp, None, None, X[idx_a, :]) # Subtract the predictions from true data to get the residuals if self.warp is None: residuals = ys_ref-y[idx_a] else: # Calculate the residuals in warped space y_ref_ws = self.warp.f( y[idx_a], hyp[1:self.warp.get_n_params()+1]) residuals = ys_ref - y_ref_ws residuals_mu = np.mean(residuals) residuals_sd = np.std(residuals) # Adjust the mean with the mean of the residuals if ys is None: # make and adjust predictions ys_out[idx_s], s2_out[idx_s] = self.predict( hyp, None, None, Xs[idx_s, :]) ys_out[idx_s] = ys_out[idx_s] - residuals_mu # Set the deviation to the devations of the residuals s2_out[idx_s] = np.ones(len(s2_out[idx_s]))*residuals_sd**2 else: # adjust the data if self.warp is not None: y_ws = self.warp.f( ys[idx_s], hyp[1:self.warp.get_n_params()+1]) ys_out[idx_s] = y_ws + residuals_mu ys_out[idx_s] = self.warp.invf( ys_out[idx_s], hyp[1:self.warp.get_n_params()+1]) else: ys = ys - residuals_mu s2_out = None return ys_out, s2_out