# Copyright (c) 2016, Vladimir Feinberg
# Licensed under the BSD 3-clause license (see LICENSE)
"""
The following methods are useful for generating various matrices for testing.
"PSDT" stands for positive semi-definite Toeplitz (and, implicitly, symmetric).
"""
from contextlib import contextmanager
import os
import random
import time
import sys
import unittest
import numpy as np
from paramz.optimization import Optimizer
import scipy.linalg as la
from ..linalg.kronecker import Kronecker
from ..linalg.sum_matrix import SumMatrix
from ..linalg.toeplitz import Toeplitz
from ..linalg.numpy_matrix import NumpyMatrix
from ..parameterization.model import Model
from .numpy_convenience import smallest_eig
[docs]class RandomTest(unittest.TestCase):
"""
This test case sets the random seed to be based on the time
that the test is run.
If there is a `SEED` variable in the enviornment, then this is used as the
seed.
Sets both random and numpy.random.
Prints the seed to stdout before running each test case.
"""
[docs] def setUp(self):
super().setUp()
seed = os.getenv("SEED")
if seed is None:
seed = int(time.time() * 37 + os.getpid())
self.seed = np.array([seed]).astype(np.uint32)[0]
print('Random test using SEED={}'.format(self.seed))
random.seed(self.seed)
np.random.seed(self.seed)
[docs]def poor_cond_toep(n):
"""
:param n: size of output
:returns: the top row of a randomly scaled PSDT matrix whose
:math:`L^2` condition number scales exponentially with `n`
"""
top = np.arange(n)[::-1] * (np.random.rand() + 1e-3)
while smallest_eig(top) < 0:
top[0] *= 2
return top
[docs]def random_toep(n):
"""
:returns: top row of a random PSDT matrix of size `n`.
"""
top = np.abs(np.random.rand(n))
top[::-1].sort()
while smallest_eig(top) < 0:
top[0] += 1
return top
[docs]def exp_decr_toep(n):
"""
:returns: top row of a PSDT matrix of size `n` with terms
exponentially decreasing in distance from the main diagonal;
the rate of which is randomly generated but at least
:math:`e`.
"""
return np.exp(-np.random.rand() * np.arange(n))
[docs]def run_main(f, help_str):
"""
A helper function which is re-used in setting up benchmarks for kernels
that are shaped in a specific manner; namely, sums of Kronecker products
of small dense and large Toeplitz matrices.
This function reads in command-line arguments and executes a simple
benchmarking script.
:param f: benchmarking function to pass generated inputs with
user-specified parameters to.
:param help_str: a help-string to be printed when the user does not
call a correct invocation of the program.
"""
if len(sys.argv) not in [5, 6]:
print('Usage: python logdet.py n d q eps [seed]')
print()
print('n > 8 is the size of the Toeplitz submatrix')
print('d > 0 is the size of the dense submatrix')
print('q > 0 is the number of dense-Toeplitz Kronecker products')
print(' to sum together for the system')
print('eps >= 0 is the constant diagonal perturbation (a float)')
print(' added in (higher eps -> better conditioning).')
print('default seed is 1234')
print()
print(help_str)
print()
print('Choose q = d = 1 and n large to test Toeplitz, mainly')
print('Choose q = 1 and n ~ d^2 > 1 to test Kronecker, mainly')
sys.exit(1)
n = int(sys.argv[1])
d = int(sys.argv[2])
q = int(sys.argv[3])
eps = float(sys.argv[4])
seed = int(sys.argv[5]) if len(sys.argv) > 5 else 1234
assert n > 8
assert d > 0
assert q > 0
assert eps >= 0
np.random.seed(seed)
print('size q {} n {} d {} eps {:g}'.format(q, n, d, eps))
cases = [
('random (well-cond) ', random_toep),
('linear decrease (poor-cond)', poor_cond_toep),
('exponentially decreasing (realistic)', exp_decr_toep)]
for name, generator in cases:
print(name)
dense_mats = [rand_pd(d) for _ in range(q)]
toep_tops = [generator(n) for _ in range(q)]
my_mat = SumMatrix([Kronecker(NumpyMatrix(dense), Toeplitz(top))
for dense, top in zip(dense_mats, toep_tops)])
# added noise
my_mat.orig_matvec = my_mat.matvec
my_mat.matvec = lambda x: my_mat.orig_matvec( # pylint:disable=cell-var-from-loop
x) + eps * x
my_mat.logdet = lambda: np.log(my_mat.approx_eigs( # pylint:disable=cell-var-from-loop
0) + eps).sum()
f(my_mat)
[docs]def rand_pd(n):
"""
:returns: a random `n` by `n` symmetric PSD matrix with positive entries
"""
A = np.random.rand(n, n)
A = (A + A.T) / 2
D = np.diag(np.fabs(A).sum(axis=1) + 1)
return A + D
[docs]def check_np_lists(a, b, atol=1e-7, rtol=1e-7):
"""
Verifies that two lists of numpy arrays are all close.
:param a:
:param b:
"""
assert len(a) == len(b), 'a {} b {}'.format(len(a), len(b))
for i, (sub_a, sub_b) in enumerate(zip(a, b)):
np.testing.assert_allclose(
sub_a, sub_b, err_msg='output {}'.format(i), atol=atol, rtol=rtol)
[docs]class SingleGradOptimizer(Optimizer):
def __init__(self, lipschitz=1):
super().__init__()
self.gradient_observed = None
self.L = lipschitz
[docs] def opt(self, x_init, f_fp=None, f=None, fp=None):
# 1 iteration only
# Note we save the negative of the gradient, since
# the Model class will implicitly flip the objective's sign
# to make the likelihood maximization into a minimization problem.
self.gradient_observed = -fp(x_init)
self.x_opt = x_init + self.gradient_observed / self.L
[docs]class BasicModel(Model):
def __init__(self, dists, Y, kern):
super().__init__('single-kern')
self.link_parameter(kern)
self.dists = dists
self.kern = kern
self.Y = Y
[docs] def log_likelihood(self):
K_top = self.kern.from_dist(self.dists)
KinvY = la.solve_toeplitz(K_top, self.Y)
# Prevent slight negative eigenvalues from roundoff.
sign, logdet = np.linalg.slogdet(
la.toeplitz(K_top) + 1e-10 * np.identity(len(K_top)))
print(self.dists)
print(K_top)
assert sign > 0, (sign, logdet)
return -0.5 * self.Y.dot(KinvY) - 0.5 * logdet
[docs] def parameters_changed(self):
# maximize -0.5 * (y . K^-1 y) - 0.5 log |K|
# gradient wrt t is 0.5 tr((a a^T - K^-1)dK/dt), a = K^-1 a
K_top = self.kern.from_dist(self.dists)
a = la.solve_toeplitz(K_top, self.Y)
all_grad = self.kern.kernel_gradient(self.dists)
likelihood_grad = np.zeros(len(all_grad))
for i, grad in enumerate(all_grad):
dKdt = la.toeplitz(grad)
Kinv_dKdt = la.solve_toeplitz(K_top, dKdt)
aaT_dKdt = np.outer(a, dKdt.dot(a))
trace = np.trace(aaT_dKdt - Kinv_dKdt)
likelihood_grad[i] = 0.5 * trace
self.kern.update_gradient(likelihood_grad)
[docs]@contextmanager
def error_context(s):
try:
yield
except AssertionError as e:
e.args = (e.args[0] + '\n' + s,) + e.args[1:]
raise e