import cython import multiprocessing import numpy as np import pywt import re import scipy.stats import time N = 1000 np.random.seed(0) class Model: """Represents a semimartingale model.""" def Z(self, x): """Returns the normalised observations Z_i.""" return (self.Y(x) - self.mu(x)) / self.sigma(x) def K(self, x): """Returns the quantity 2^J.""" n = len(x) - 1 J = np.floor(np.log2(n) / 2.0) return 2.0**J def alpha(self, x): """Returns sqrt(n) times the scaling coefficients alpha_{J,k}.""" Z = self.Z(x) n = len(Z) t = np.arange(n) / float(n) K = self.K(x) s = scipy.stats.binned_statistic(t, Z, 'sum', K, [(0, 1)])[0] return s * np.sqrt(K / n) def stat(self, x): """Returns the test statistic T.""" alpha = self.alpha(x) coefs = pywt.wavedec(alpha, 'haar') return max(np.abs(v).max() for v in coefs) def p_value(self, x, method='stat', k=N): """Returns a p-value for the test statistic T.""" def gen(): xp = self.sim(len(x) - 1, x[0]) return self.fitted(xp, method=method) T = getattr(self, method)(x) r = sum(gen() < T for i in range(k)) return 1 - r / float(k + 1) def fitted(self, x, method='p_value'): """Fits the model parameters, then calls the requested function on the fitted model. """ theta = self.theta_hat(x) return getattr(self.__class__(*theta), method)(x) class VolModel(Model): """Represents a local volatility model.""" @staticmethod def Y(x): """Returns the observations Y.""" dx = np.diff(x) return len(dx) * dx**2 def sigma(self, x): """Returns the std. dev. function sigma.""" return np.sqrt(2) * self.mu(x) class DP(VolModel): """Represents a local volatility model from DP 2008.""" def __init__(self, drift, vol, theta=1.0): """Sets the model parameters.""" self._drift = drift self._vol = vol self._theta = theta def sim(self, n, x0=1, k=1): """Simulates n increments of the process over [0, 1], started from x0.""" m = n * k db = np.sqrt(self._theta / m) * np.random.randn(m) t = np.arange(m) / float(m) x = np.empty(m + 1) x[0] = x0 cython.inline(""" #cython: boundscheck=False, wraparound=False, cdivision=True cdef extern from "math.h": double sin(double x) double exp(double x) double fabs(double x) double sqrt(double x) double pow(double a, double b) cdef double [:] X = x, T = t, DB = db cdef int M = m, i cdef double b, s for i in range(M): b = %s s = %s X[i+1] = X[i] + b / M + s * DB[i] """ % tuple(re.sub('(X|T)', r'\1[i]', s) for s in (self._drift, self._vol))) return x[0::k] def vol(self, x): """Returns the volatility process.""" n = len(x) X = x[:-1] T = np.arange(n) / float(n) from numpy import sin, exp, sqrt, abs as fabs, power as pow return eval(self._vol) def mu(self, x): """Returns the mean function mu.""" return self._theta * self.vol(x) ** 2 def theta_hat(self, x): """Returns a linear least-squares estimate of the parameter sigma, ignoring drift. """ n = len(x) - 1 X = self.vol(x) ** 2 Y = n * np.diff(x) ** 2 theta = np.mean(X * Y) / np.mean(X ** 2) return (self._drift, self._vol, theta) def p_values(null, true, x0, n=200, m=N): """Generate sample p-vallues for testing the model null against data from the model true, started from x0. Simulate using m samples of n increments. """ def gen(): x = true.sim(n, x0) return null.fitted(x) return np.array([gen() for i in range(m)]) def rp(n, nd, nv, ad, av): """Compute a column set for the rejection probability table.""" h0 = DP(nd, nv) for drift in ad: for vol in av: h1 = DP(drift, vol) pvals = p_values(h0, h1, 1, n=n) yield (np.mean(pvals <= 0.05), np.mean(pvals <= 0.1)) def rpn(args): """Generate a section of the rejection probability table.""" heading = r""" \addlinespace[1em] \multicolumn{7}{l}{\em %s}\\""" row = ' & '.join([r'\quad \(%s = %s\)'] + ['%.3f'] * 6) + r' \\' caption, nd, nv, ad, av, var, vals = args ns = [100, 200, 500] cols = np.array([list(rp(n, nd, nv, ad, av)) for n in ns]) cols = cols.transpose(1, 0, 2).reshape(len(vals), -1) s = ([heading % caption] + [row % ((var, val) + tuple(col)) for val, col in zip(vals, cols)]) return '\n'.join(s) def main(): """Generate the rejection probability table.""" header = r""" \begin{table} \centering \begin{tabular}{lcccccc} \toprule \(n\) & \multicolumn{2}{c}{100} & \multicolumn{2}{c}{200} & \multicolumn{2}{c}{500} \\ \cmidrule(lr){2-3}\cmidrule(lr){4-5}\cmidrule(lr){6-7} \(\alpha\) & 5\% & 10\% & 5\% & 10\% & 5\% & 10\% \\ \midrule""" footer = r""" \addlinespace[1em] \bottomrule \end{tabular} \caption{Observed rejection probabilities for bootstrap test.} \label{tab:results} \end{table}""" drifts = ['0', '2', 'X', '2 - X', 'X * T'] drift_names = ['0', '2', 'X_t', '2 - X_t', 'tX_t'] vols = ['1 + X', '1 + sin(5 * X)', '1 + X * exp(T)', '1 + X * sin(5 * T)', '1 + X * T'] vol_names = ['1 + X_t', '1 + \\sin{5X_t}', '1 + X_t \\exp{t}', '1 + X_t \\sin{5t}', '1 + tX_t'] vols2 = ['sqrt(1 + pow(X, 2))', '1', 'sqrt(5 * pow(fabs(X), 1.5))', 'sqrt(5 * fabs(X))', 'fabs(1 + X)'] vol2_names = ['1 + X_t^2', '1', '5\\abs{X_t}^{3/2}', '5\\abs{X_t}', '(1+X_t)^2'] args = [('Constant volatility, null, \\(\\mu_t = 1\\)', '0', '1', drifts, ['1'], 'b_t', drift_names), ('Constant volatility, alternative, \\(b_t = X_t\\)', 'X', '1', ['X'], vols, '\\sqrt{\\mu_t}', vol_names), ('Proportional volatility, null, \\(\mu_t = X_t^2\\)', '0', 'X', drifts, ['X'], 'b_t', drift_names), ('Proportional volatility, alternative, \\(b_t = 2 - X_t\\)', '2 - X', 'X', ['2 - X'], vols2, '\\mu_t', vol2_names)] start = time.time() pool = multiprocessing.Pool(4) s = pool.map(rpn, args) s = header + ''.join(s) + footer with open('sdgft-tables.tex', 'w') as f: f.write(s) end = time.time() mins = np.floor((end - start) / 60) hrs = np.floor(mins / 60) mins = mins % 60 print('Done (N = %d, t = %dh %dm)' % (N, hrs, mins)) main()