Source code for rfa

from __future__ import print_function
from __future__ import division

import numpy as np
import torch


[docs] class GPRRFA: """Random Feature Approximation for Gaussian Process Regression Estimation and prediction of Bayesian linear regression models Basic usage:: R = GPRRFA() hyp = R.estimate(hyp0, X, y) ys,s2 = R.predict(hyp, X, y, Xs) where the variables are :param hyp: vector of hyperparmaters. :param X: N x D data array :param y: 1D Array of targets (length N) :param Xs: Nte x D array of test cases :param hyp0: starting estimates for hyperparameter optimisation :returns: * ys - predictive mean * s2 - predictive variance The hyperparameters are:: hyp = [ log(sn), log(ell), log(sf) ] # hyp is a numpy array where sn^2 is the noise variance, ell are lengthscale parameters and sf^2 is the signal variance. This provides an approximation to the covariance function:: k(x,z) = x'*z + sn2*exp(0.5*(x-z)'*Lambda*(x-z)) where Lambda = diag((ell_1^2, ... ell_D^2)) Written by A. Marquand """ def __init__(self, hyp=None, X=None, y=None, n_feat=None, n_iter=100, tol=1e-3, verbose=False): self.hyp = np.nan self.nlZ = np.nan self.tol = tol # not used at present self.Nf = n_feat self.n_iter = n_iter self.verbose = verbose self._n_restarts = 5 if (hyp is not None) and (X is not None) and (y is not None): self.post(hyp, X, y) def _numpy2torch(self, X, y=None, hyp=None): if type(X) is torch.Tensor: pass elif type(X) is np.ndarray: X = torch.from_numpy(X) else: ValueError('Unknown data type (X)') X = X.double() if y is not None: if type(y) is torch.Tensor: pass elif type(y) is np.ndarray: y = torch.from_numpy(y) else: raise ValueError('Unknown data type (y)') if len(y.shape) == 1: y.resize_(y.shape[0], 1) y = y.double() if hyp is not None: if type(hyp) is torch.Tensor: pass else: hyp = torch.tensor(hyp, requires_grad=True) return X, y, hyp
[docs] def get_n_params(self, X): return X.shape[1] + 2
[docs] def post(self, hyp, X, y): """ Generic function to compute posterior distribution. This function will save the posterior mean and precision matrix as self.m and self.A and will also update internal parameters (e.g. N, D and the prior covariance (Sigma) and precision (iSigma). """ # make sure all variables are the right type X, y, hyp = self._numpy2torch(X, y, hyp) self.N, self.Dx = X.shape # ensure the number of features is specified (use 75% as a default) if self.Nf is None: self.Nf = int(0.75 * self.N) self.Omega = torch.zeros((self.Dx, self.Nf), dtype=torch.double) for f in range(self.Nf): self.Omega[:, f] = torch.exp(hyp[1:-1]) * \ torch.randn((self.Dx, 1), dtype=torch.double).squeeze() XO = torch.mm(X, self.Omega) self.Phi = torch.exp(hyp[-1])/np.sqrt(self.Nf) * \ torch.cat((torch.cos(XO), torch.sin(XO)), 1) # concatenate linear weights self.Phi = torch.cat((self.Phi, X), 1) self.D = self.Phi.shape[1] if self.verbose: print("estimating posterior ... | hyp=", hyp) self.A = torch.mm(torch.t(self.Phi), self.Phi) / torch.exp(2*hyp[0]) + \ torch.eye(self.D, dtype=torch.double) self.m = torch.mm(torch.solve(torch.t(self.Phi), self.A)[0], y) / \ torch.exp(2*hyp[0]) # save hyperparameters self.hyp = hyp # update optimizer iteration count if hasattr(self, '_iterations'): self._iterations += 1
[docs] def loglik(self, hyp, X, y): """ Function to compute compute log (marginal) likelihood """ X, y, hyp = self._numpy2torch(X, y, hyp) # always recompute the posterior self.post(hyp, X, y) # logdetA = 2*torch.sum(torch.log(torch.diag(torch.cholesky(self.A)))) try: # compute the log determinants in a numerically stable way logdetA = 2 * \ torch.sum(torch.log(torch.diag(torch.cholesky(self.A)))) except Exception as e: print("Warning: Estimation of posterior distribution failed") print(e) # nlZ = torch.tensor(1/np.finfo(float).eps) nlZ = torch.tensor(np.nan) self._optim_failed = True return nlZ # compute negative marginal log likelihood nlZ = -0.5 * (self.N*torch.log(1/torch.exp(2*hyp[0])) - self.N*np.log(2*np.pi) - torch.mm(torch.t(y - torch.mm(self.Phi, self.m)), (y - torch.mm(self.Phi, self.m))) / torch.exp(2*hyp[0]) - torch.mm(torch.t(self.m), self.m) - logdetA) if self.verbose: print("nlZ= ", nlZ, " | hyp=", hyp) # save marginal likelihood self.nlZ = nlZ return nlZ
[docs] def dloglik(self, hyp, X, y): """ Function to compute derivatives """ print("derivatives not available") return
[docs] def estimate(self, hyp0, X, y, optimizer='lbfgs'): """ Function to estimate the model """ if type(hyp0) is torch.Tensor: hyp = hyp0 hyp0.requires_grad_() else: hyp = torch.tensor(hyp0, requires_grad=True) # save the starting values self.hyp0 = hyp if optimizer.lower() == 'lbfgs': opt = torch.optim.LBFGS([hyp]) else: raise ValueError("Optimizer " + " not implemented") self._iterations = 0 def closure(): opt.zero_grad() nlZ = self.loglik(hyp, X, y) if not torch.isnan(nlZ): nlZ.backward() return nlZ for r in range(self._n_restarts): self._optim_failed = False nlZ = opt.step(closure) if self._optim_failed: print("optimization failed. retrying (", r+1, "of", self._n_restarts, ")") hyp = torch.randn_like(hyp, requires_grad=True) self.hyp0 = hyp else: print("Optimzation complete after", self._iterations, "evaluations. Function value =", nlZ.detach().numpy().squeeze()) break return self.hyp.detach().numpy()
[docs] def predict(self, hyp, X, y, Xs): """ Function to make predictions from the model """ X, y, hyp = self._numpy2torch(X, y, hyp) Xs, *_ = self._numpy2torch(Xs) if (hyp != self.hyp).all() or not (hasattr(self, 'A')): self.post(hyp, X, y) # generate prediction tensors XsO = torch.mm(Xs, self.Omega) Phis = torch.exp(hyp[-1])/np.sqrt(self.Nf) * \ torch.cat((torch.cos(XsO), torch.sin(XsO)), 1) # add linear component Phis = torch.cat((Phis, Xs), 1) ys = torch.mm(Phis, self.m) # compute diag(Phis*(Phis'\A)) avoiding computing off-diagonal entries s2 = torch.exp(2*hyp[0]) + \ torch.sum(Phis * torch.t(torch.solve(torch.t(Phis), self.A)[0]), 1) # return output as numpy arrays return ys.detach().numpy().squeeze(), s2.detach().numpy().squeeze()