# Copyright (c) 2016, Vladimir Feinberg
# Licensed under the BSD 3-clause license (see LICENSE)
import functools
import logging
from multiprocessing import Pool, cpu_count
import os
import numpy as np
import scipy.linalg as la
from .multigp import MultiGP
from ..approx.interpolation import multi_interpolant, autogrid
from ..approx.iterative import Iterative
from ..linalg.composition import Composition
from ..linalg.matrix import Matrix
from ..lmc.stochastic_deriv import StochasticDerivService
from ..lmc.grid_kernel import gen_grid_kernel
from ..lmc.metrics import Metrics
from ..lmc.likelihood import ExactLMCLikelihood, ApproxLMCLikelihood
from ..util.docs import inherit_doc
from ..util.numpy_convenience import cartesian_product
from ..util.inline_pool import InlinePool
_LOG = logging.getLogger(__name__)
[docs]@inherit_doc # pylint: disable=too-many-instance-attributes
class InterpolatedLLGP(MultiGP):
"""
The main class of this package, `InterpolatedLLGP` implements linearithmic
Gaussian Process learning in the multi-output case. See
the paper on `arxiv <https://arxiv.org/abs/1705.10813>`_.
.. Note: Currently, only one-dimensional input is supported.
Upon construction, this class assumes ownership of its parameters and
does not account for changes in their values.
For a dataset of inputs `Xs` across multiple outputs `Ys`, let :math:`X`
refer to the concatenation of `Xs`. According to the functional
specification of the LMC kernel by `functional_kernel` (see documentation
in :class:`runlmc.lmc.functional_kernel.FunctionalKernel`), we can create
the covariance matrix for a multi-output GP model applied to all pairs
of :math:`X`, resulting in :math:`K_{X,X}`.
The point of this class is to vary hyperparameters of :math:`K`, the
`FunctionalKernel` given by `functional_kernel`, until the model log
likelihood is as large as possible.
This class uses the SKI approximation to do this efficiently,
which shares a single grid
:math:`U` as the input array for all the outputs. Then,
:math:`K_{X,X}` is interpolated from the approximation kernel
:math:`K_{\\text{SKI}}`, as directed in
*Thoughts on Massively Scalable Gaussian Processes* by Wilson, Dann,
and Nickisch. This is done with sparse interpolation matrices :math:`W`.
.. math::
K_{X,X}\\approx K_{\\text{SKI}} = W K_{U,U} W^\\top +
\\boldsymbol\\epsilon I
Above, :math:`K_{U,U}` is a structured kernel over a grid :math:`U`. This
grid is specified by `lo,hi,m`.
The functionality for the various prediction modes is summarized below.
* `'on-the-fly'` - Use matrix-free inversion to compute the covariance \
for the entire set of points on which we're predicting. This means that \
variance prediction take :math:`O(n \log n)` time per test point, where \
`Xs` has :math:`n` datapoints total. This should be preferred for small \
test sets.
* `'precompute'` - Compute an auxiliary predictive variance matrix for \
the grid points, but then cheaply re-use that work for prediction. This \
is an up-front :math:`O(n^2 \log n)` payment for :math:`O(1)` predictive \
variance afterwards per test point. This is not available if using \
split kernels (i.e., different active dimensions for different kernels).
* `'exact'` - Use the exact cholesky-based algorithm (not matrix free), \
:math:`O(n^3)` runtime up-front and then :math:`O(n^2)` per query.
Note `'on-the-fly', 'precompute'` can be parallelized by the number
of test points and training points, respectively.
:param Xs: input observations, should be a list of numpy arrays,
where each numpy array is a design matrix for the inputs to
output :math:`i`. If the :math:`i`-th input has :math:`n_i`
data points, then this matrix can be :math:`n_i` or
:math:`n_i\\times P` shape for input dimension :math:`P`,
with the former re-interpreted as :math:`P=1`.
:param Ys: output observations, this must be a list of one-dimensional
numpy arrays, matching up with the number of rows in `Xs`.
:param normalize: optional normalization for outputs `Ys`.
Prediction will be un-normalized.
:param lo: lexicographically smallest point in inducing point grid used
(by default, a bit less than the minimum of input). For
multidimensional inputs this should be a vector.
:param hi: lexicographically largest point in inducing point grid used
(by default, a bit more than the maximum of input). For
multidimensional inputs this should be a vector.
:param m: number of inducing points to use. For multidimensional inputs
this should be
a vector indicating how many grid points there should be
along each dimension. The total number of points used is then
`np.prod(m)`. By default, `m` is a constant array of dimension
:math:`P`, the input dimension, of size equal to the average
input sequence length.
:param str name:
:param metrics: whether to record optimization metrics during optimization
(runs exact solution alongside this one, may be slow).
:param prediction: one of `'matrix-free'`, `'on-the-fly'`,
`'precompute'`, `'exact'`, `'sample'`.
:param max_procs: maximum number of processes to use for parallelism,
defaults to cpu count.
:param functional_kernel: a
:class:`runlmc.lmc.functional_kernel.FunctionalKernel` determining
:math:`K`.
:param trace_iterations: number of iterations to be used in approximate
trace algorithm.
:raises: :class:`ValueError` if `Xs` and `Ys` lengths do not match.
:raises: :class:`ValueError` if normalization if any `Ys` have no variance
or values in `Xs` have multiple identical
values.
:ivar metrics: the :class:`runlmc.lmc.metrics.Metrics` instance associated
with the model
"""
def __init__(self, Xs, Ys, normalize=True, # pylint: disable=too-many-arguments,dangerous-default-value,too-many-locals
lo=None, hi=None, m=None, name='lmc',
metrics=False, prediction='on-the-fly', max_procs=None,
trace_iterations=15, tolerance=1e-4,
functional_kernel=None):
super().__init__(Xs, Ys, normalize=normalize, name=name)
self.update_model(False)
if not functional_kernel:
raise ValueError('functional_kernel must be provided')
self.prediction = prediction
if prediction not in self._prediction_methods():
raise ValueError('Variance prediction method {} unrecognized'
.format(prediction))
self._functional_kernel = functional_kernel
self._functional_kernel.set_input_dim(self.input_dim)
self.link_parameter(self._functional_kernel)
self.y = np.hstack(self.Ys)
self.kernel = None
# We need a grid for each subset of active dimensions.
self.dists, self.interpolants, self.grid_axes = {}, {}, {}
self._generate_grids(lo, hi, m)
self.update_model(True)
self.metrics = Metrics() if metrics else None
self._pool = InlinePool(self._generate_pool(max_procs))
self._deriv_service = StochasticDerivService(
self.metrics, self._pool, trace_iterations, tolerance)
_LOG.info('InterpolatedLLGP %s fully initialized', self.name)
EVAL_NORM = np.inf
def _generate_pool(self, max_procs):
self._check_omp(max_procs)
max_procs = cpu_count() if max_procs is None else max_procs
if max_procs == 1 or len(self.y) < 300:
_LOG.info('InterpolatedLLGP (%d hyperparams) will run serially',
len(self.param_array))
return None
_LOG.info('InterpolatedLLGP (%d hyperparams) with %d workers',
len(self.param_array), max_procs)
return Pool(processes=max_procs)
def _check_omp(self, procs_requested):
omp = os.environ.get('OMP_NUM_THREADS', None)
procs_info = 'InterpolatedLLGP(max_procs={})'.format(procs_requested)
if procs_requested is None:
procs_info += ' [defaults to {}]'.format(cpu_count())
if omp is None and (procs_requested or cpu_count()) > 1:
_LOG.warning('Parallelizing at the process level with %s is'
' incompatible with OMP-level parallelism '
'(OMP_NUM_THREADS env var is unset, using all '
'available cores)', procs_info)
[docs] def optimize(self, **kwargs):
if self.metrics is not None:
self.metrics = Metrics()
self._deriv_service.metrics = self.metrics
super().optimize(**kwargs)
[docs] def parameters_changed(self):
self._clear_caches()
grid_kernel, gk_dict = gen_grid_kernel(
self._functional_kernel,
self.dists,
self.interpolants,
list(map(len, self.Ys)))
self.kernel = ApproxLMCLikelihood(
self._functional_kernel,
grid_kernel,
self.dists,
self.interpolants,
self.Ys,
self._deriv_service)
self.kernel._grid_kernels = gk_dict
if _LOG.isEnabledFor(logging.DEBUG):
fmt = '{:7.6e}'.format
def np_print(x):
return np.array2string(np.copy(x), formatter={'float': fmt})
_LOG.debug('Parameters changed')
_LOG.debug('log likelihood %f', self.log_likelihood())
_LOG.debug('normal quadratic %f', self.normal_quadratic())
_LOG.debug('log det K %f', self.log_det_K())
_LOG.debug('noise %s', np_print(self._functional_kernel.noise))
_LOG.debug('coreg vecs')
for i, a in enumerate(self._functional_kernel.coreg_vecs):
_LOG.debug(' a%d %s', i, np_print(a))
_LOG.debug('coreg diags')
for i, a in enumerate(self._functional_kernel.coreg_diags):
_LOG.debug(' kappa%d %s', i, np_print(a))
self._functional_kernel.update_gradient(self.kernel)
if self.metrics is not None:
approx = self.kernel
exact = self._dense()
approx_grad, exact_grad = (
np.concatenate((
np.concatenate(
k.coreg_vec_gradients()).reshape(-1),
np.concatenate(k.coreg_diags_gradients()),
np.concatenate(k.kernel_gradients()),
k.noise_gradient()))
for k in [approx, exact])
approx_norm = la.norm(approx_grad, self.EVAL_NORM)
exact_norm = la.norm(exact_grad, self.EVAL_NORM)
diff_norm = la.norm(approx_grad - exact_grad, self.EVAL_NORM)
self.metrics.grad_norms.append(approx_norm)
self.metrics.grad_error.append(diff_norm / exact_norm)
self.metrics.log_likely.append(self.log_likelihood())
@functools.lru_cache(maxsize=1)
def _dense(self):
return ExactLMCLikelihood(
self._functional_kernel, self.Xs, self.Ys)
[docs] def K(self):
"""
.. warning:: This generates the entire kernel, a quadratic operation
in memory and time.
:returns: :math:`K_{\\text{SKI}}`, the approximation of the exact
kernel.
"""
return self._dense().K
[docs] def log_det_K(self):
"""
:returns: an upper bound of the approximate log determinant,
uses :math:`K_\\text{SKI}` to find an approximate
upper bound for
:math:`\\log\\det K_{\text{exact}}`
"""
diag = np.diag(self._dense().L[0])
lgdet = np.log(diag).sum() * 2
sgn = np.sign(diag).prod()
if sgn <= 0:
_LOG.critical('Log determinant nonpos! sgn %f lgdet %f '
'returning -inf', sgn, lgdet)
return -np.inf
return lgdet
[docs] def normal_quadratic(self):
"""
If the flattened (Stacked)outputs are written as :math:`\\textbf{y}`,
this returns :math:`\\textbf{y}^\\top K_{\\text{SKI}}^{-1}\\textbf{y}`.
:returns: the normal quadratic term for the current outputs `Ys`.
"""
return self.y.dot(self.kernel.alpha())
[docs] def log_likelihood(self):
nll = self.log_det_K() + self.normal_quadratic()
nll += len(self.y) * np.log(2 * np.pi)
return -0.5 * nll
# Predictive mean for grid points U
@functools.lru_cache(maxsize=1)
def _grid_alpha(self):
grid_alpha = {}
for active_dim in self._functional_kernel.active_dims:
_, WT = self.interpolants[active_dim]
grid_K = self.kernel._grid_kernels[active_dim].grid_K
grid_alpha[active_dim] = grid_K.matvec(WT.dot(self.kernel.alpha()))
return grid_alpha
# The native covariance diag(K) for each output, i.e.,
# the a priori variance of a single point for each output.
@functools.lru_cache(maxsize=1)
def _native_variance(self):
coregs = np.column_stack(np.square(per_output).sum(axis=0)
for per_output
in self._functional_kernel.coreg_vecs)
coregs += np.column_stack(self._functional_kernel.coreg_diags)
zero_dist = {v: 0 for v in self._functional_kernel.active_dims}
kernels = self._functional_kernel.eval_kernels(zero_dist)
native_output_var = coregs.dot(kernels).reshape(-1)
native_var = native_output_var + self._functional_kernel.noise
return native_var
# TODO(test) prediction testing
def _prediction_methods(self):
return {
'on-the-fly': self._var_predict_on_the_fly,
'precompute': self._var_predict_precompute,
'exact': self._var_predict_exact,
}
def _raw_predict(self, Xs):
lens = [len(X) for X in Xs]
if self.kernel is None:
self.parameters_changed()
grid_alpha = self._grid_alpha()
native_variance = self._native_variance()
mean = np.zeros(sum(lens))
prediction_W = {}
for active_dim, grid_alpha_ad in grid_alpha.items():
Xs_active = [X[:, active_dim] for X in Xs]
W = multi_interpolant(Xs_active, *self.grid_axes[active_dim])
prediction_W[active_dim] = W
mean += W.dot(grid_alpha_ad)
native_variance = np.repeat(native_variance, lens)
explained_variance = self._prediction_methods()[self.prediction](
prediction_W, Xs)
var = native_variance - explained_variance
var[var < 0] = 0
endpoints = np.add.accumulate(lens)[:-1]
return np.split(mean, endpoints), np.split(var, endpoints)
def _var_predict_exact(self, _, Xs):
exact = self._dense()
K_test_X = ExactLMCLikelihood.kernel_from_indices(
Xs, self.Xs, self._functional_kernel)
var_explained = K_test_X.dot(la.cho_solve(exact.L, K_test_X.T))
return np.diag(var_explained)
@staticmethod
def _var_solve(i, K, K_XU, K_UX):
x = np.zeros(K_XU.shape[1])
x[i] = 1
x = K_XU.matvec(x)
x = Iterative.solve(K, x)
x = K_UX.matvec(x)
return x[i]
@functools.lru_cache(maxsize=1)
def _precomputed_nu(self):
if len(self.interpolants) != 1:
raise ValueError(
'precompute prediction mode unavailable for split kernels')
W, WT = self.interpolants[tuple(range(self.input_dim))]
K_UU = self.kernel.K.Ks[0].grid_K
# SKI should have a half_matrix() method returning these two,
# in which case we'd just ask self.kernel.K.ski for them.
K_XU = Composition([Matrix.wrap(W.shape, W.dot), K_UU])
K_UX = Composition([K_UU, Matrix.wrap(WT.shape, WT.dot)])
Dm = K_XU.shape[1]
ls = [(i, self.kernel.K, K_XU, K_UX) for i in range(Dm)]
nu = self._pool.starmap(InterpolatedLLGP._var_solve, ls)
return nu
def _var_predict_precompute(self, prediction_interpolants, _):
nu = self._precomputed_nu()
assert len(prediction_interpolants) == 1
W = list(prediction_interpolants.values()).pop()
return W.dot(nu)
def _var_predict_on_the_fly(self, _, Xs):
K_test_X = ExactLMCLikelihood.kernel_from_indices(
Xs, self.Xs, self._functional_kernel)
ls = [(self.kernel.K, k_star) for k_star in K_test_X]
inverted = np.array(self._pool.starmap(Iterative.solve, ls)).T
full_mat = K_test_X.dot(inverted)
return np.diag(full_mat)
def _clear_caches(self):
self._dense.cache_clear()
self._grid_alpha.cache_clear()
self._native_variance.cache_clear()
self._precomputed_nu.cache_clear()
@staticmethod
def _wrap(x, active_dims):
if x is None:
return None
n = np.asarray(x)
if not n.shape:
assert len(active_dims) == 1, len(active_dims)
return n.reshape(1)
return n[list(active_dims)]
def _generate_grids(self, lo, hi, m):
n = sum(map(len, self.Xs))
for active_dim in self._functional_kernel.active_dims.keys():
# Grid corresponds to U
wlo, whi, wm = (self._wrap(x, active_dim) for x in (lo, hi, m))
Xs = [X[:, active_dim] for X in self.Xs]
self.grid_axes[active_dim] = autogrid(Xs, wlo, whi, wm)
grid = cartesian_product(*self.grid_axes[active_dim])
first = grid[0]
grid_shape = list(map(len, self.grid_axes[active_dim]))
grid_shape.append(len(self.grid_axes[active_dim]))
grid = grid.reshape(grid_shape)
# BTTB(dists.ravel(), dists.shape)
# is the pairwise distance matrix of U
self.dists[active_dim] = la.norm(grid - first, axis=-1)
# Corresponds to W; block diagonal matrix.
interpolant = multi_interpolant(Xs, *self.grid_axes[active_dim])
interpolantT = interpolant.transpose().tocsr()
self.interpolants[active_dim] = (interpolant, interpolantT)
_LOG.info('InterpolatedLLGP %s generated grid (n = %d, m = %d) '
'for active dimensions %s',
self.name, n,
np.prod([len(g) for g in self.grid_axes[active_dim]]),
str(active_dim))