# -*- coding: utf-8 -*-
################################################################################
# Copyright 2014, The Open Aggregator
# GNU General Public License, Ver. 3 (see docs/license.txt)
################################################################################
__copyright__ = "Copyright 2014, The Open Aggregator"
__license__ = "GPL"
__author__ = "James Rising"
__credits__ = ["James Rising", "Solomon Hsiang", "Bob Kopp"]
__maintainer__ = "James Rising"
__email__ = "jar2234@columbia.edu"
__status__ = "Production"
__version__ = "$Revision$"
# $Source$
import csv, math, string, random, traceback
import numpy as np
from scipy.interpolate import UnivariateSpline
from scipy.stats import norm
from scipy.special import erf
from model import Model
from univariate_model import UnivariateModel
from memoizable import MemoizableUnivariate
[docs]class SplineModel(UnivariateModel, MemoizableUnivariate):
'''
Model Spline File
Each line in a model spline file represents a polynomial segment in
log-probability space. The format is as follows::
spp1
<x>,<y0>,<y1>,<a0>,<a1>,<a2>
...
Each line describes a segment of a probability distribution of y,
conditional on x = ``<x>``. The segment spans from ``<y0>`` to
``<y1>``, where the lowest value of ``<y0>`` may be ``-inf``, and the
highest value of ``<y1>`` may be ``inf``. The ``<x>`` values may also
be categorical or numerical. If they are numerical, it is assumed
that these values represent samples of a smoothly varying function (a
cubic spline in every y).
The values ``<a0>``, ``<a1>`` and ``<a2>`` are the polynomial
coefficients in y (with quadratic coefficients, only normal or
exponential tails are possible). The final segment of the probability
function is::
exp(a0 + a1 y + a2 y2)
Parameters
----------
xx_is_categorical : bool
xx : list-like
conditionals :
scaled : bool
'''
posinf = float('inf')
neginf = float('-inf')
samples = 1000
def __init__(self, xx_is_categorical=False, xx=None, conditionals=None, scaled=True):
super(SplineModel, self).__init__(xx_is_categorical, xx, scaled)
if conditionals is not None:
self.conditionals = conditionals
else:
self.conditionals = []
def __repr__(self):
return "Spline model"
[docs] def kind(self):
return 'spline_model'
[docs] def copy(self):
conditionals = []
for conditional in self.conditionals:
conditionals.append(conditional.copy())
return SplineModel(self.xx_is_categorical, list(self.get_xx()), conditionals, scaled=self.scaled)
[docs] def get_xx(self):
if self.xx_is_categorical:
return self.xx_text
else:
return self.xx
[docs] def eval_pval(self, x, p, threshold=1e-3):
conditional = self.get_conditional(x)
return conditional.get_pval(p, threshold)
[docs] def scale_y(self, a):
for conditional in self.conditionals:
conditional.scale_y(a)
if self.scaled:
conditional.rescale()
return self
[docs] def scale_p(self, a):
for conditional in self.conditionals:
conditional.scale_p(a)
if self.scaled:
conditional.rescale()
return self
[docs] def filter_x(self, xx):
conditionals = []
for x in xx:
conditionals.append(self.get_conditional(x))
return SplineModel(self.xx_is_categorical, xx, conditionals, scaled=self.scaled)
[docs] def interpolate_x(self, newxx):
'''
Determines whether argument `newxx` a subset of index ``xx``.
'''
subset = True
for x in newxx:
if x not in self.get_xx():
subset = False
if subset:
return self.filter_x(newxx)
#(limits, ys) = SplineModelConditional.propose_grid(self.conditionals)
#ddp_model = self.to_ddp(ys).interpolate_x(newxx)
#return SplineModel.from_ddp(ddp_model, limits)
conditionals = []
for x in newxx:
conditionals.append(self.get_conditional(x).copy())
return SplineModel(self.xx_is_categorical, newxx, conditionals, True)
# Only for categorical models
[docs] def recategorize_x(self, oldxx, newxx):
"""Construct a new model with categorical x values 'newxx', using the conditionals currently assigned to categorical x values 'oldxx'."""
conditionals = []
for ii in range(len(oldxx)):
if oldxx[ii] == -1 or (not isinstance(oldxx[ii], str) and not isinstance(oldxx[ii], unicode) and np.isnan(oldxx[ii])): # Not available
conditionals.append(SplineModelConditional.make_gaussian(-np.inf, np.inf, np.nan, np.nan))
else:
conditionals.append(self.get_conditional(oldxx[ii]))
return SplineModel(True, newxx, conditionals, scaled=self.scaled)
[docs] def add_conditional(self, x, conditional):
if not self.xx_is_categorical:
try:
self.xx.append(float(x))
self.xx_text.append(str(x))
except ValueError:
self.xx_is_categorical = True
if self.xx_is_categorical:
self.xx_text.append(x)
self.xx.append(len(self.xx))
self.conditionals.append(conditional)
[docs] def get_conditional(self, x):
if x is None or x == '' or len(self.conditionals) == 1:
return self.conditionals[0]
try:
return self.conditionals[self.xx_text.index(str(x))]
except Exception as e:
return SplineModelConditional.find_nearest(self.xx, x, self.conditionals)
[docs] def write_file(self, filename, delimiter):
with open(filename, 'w') as fp:
self.write(fp, delimiter)
[docs] def write(self, file, delimiter):
if self.scaled:
file.write("spp1\n")
else:
file.write("spv1\n")
writer = csv.writer(file, delimiter=delimiter)
for ii in range(len(self.xx)):
for jj in range(len(self.conditionals[ii].y0s)):
row = [self.xx_text[ii], self.conditionals[ii].y0s[jj], self.conditionals[ii].y1s[jj]]
row.extend(self.conditionals[ii].coeffs[jj])
writer.writerow(row)
[docs] def write_gaussian(self, file, delimiter):
writer = csv.writer(file, delimiter=delimiter)
writer.writerow(['dpc1', 'mean', 'sdev'])
for ii in range(len(self.xx)):
for jj in range(len(self.conditionals[ii].y0s)):
if len(self.conditionals[ii].coeffs[jj]) == 1 and self.conditionals[ii].coeffs[jj][0] == SplineModel.neginf:
continue
elif len(self.conditionals[ii].coeffs[jj]) == 3:
writer.writerow([self.xx_text[ii], self.conditionals[ii].gaussian_mean(jj), self.conditionals[ii].gaussian_sdev(jj)])
else:
writer.writerow([self.xx_text[ii], None, None])
[docs] def write_gaussian_plus(self, file, delimiter):
writer = csv.writer(file, delimiter=delimiter)
writer.writerow(['dpc1', 'y0', 'y1', 'mean', 'sdev'])
for ii in range(len(self.xx)):
for jj in range(len(self.conditionals[ii].y0s)):
if len(self.conditionals[ii].coeffs[jj]) == 1 and self.conditionals[ii].coeffs[jj][0] == SplineModel.neginf:
continue
elif len(self.conditionals[ii].coeffs[jj]) == 3:
writer.writerow([self.xx_text[ii], self.conditionals[ii].y0s[jj], self.conditionals[ii].y1s[jj], self.conditionals[ii].gaussian_mean(jj), self.conditionals[ii].gaussian_sdev(jj)])
else:
writer.writerow([self.xx_text[ii], self.conditionals[ii].y0s[jj], self.conditionals[ii].y1s[jj], None, None])
[docs] def to_points_at(self, x, ys):
conditional = self.get_conditional(x)
return conditional.to_points(ys)
[docs] def cdf(self, xx, yy):
conditional = self.get_conditional(xx)
return conditional.cdf(yy)
[docs] def is_gaussian(self, x=None):
conditional = self.get_conditional(x)
return len(conditional.y0s) == 1 and len(conditional.coeffs[0]) == 3
[docs] def get_mean(self, x=None):
if not isinstance(x, str) and not isinstance(x, unicode) and np.isnan(x):
return np.nan
conditional = self.get_conditional(x)
if conditional.is_gaussian():
return conditional.gaussian_mean(0)
total = 0
for ii in range(conditional.size()):
total += conditional.nongaussian_xpx(ii)
return total
[docs] def get_sdev(self, x=None):
conditional = self.get_conditional(x)
if conditional.is_gaussian():
return conditional.gaussian_sdev(0)
total = 0
for ii in range(conditional.size()):
total += conditional.nongaussian_x2px(ii)
mean = self.get_mean(x)
return math.sqrt(total - mean**2)
[docs] def draw_sample(self, x=None):
conditional = self.get_conditional(x)
return conditional.draw_sample()
[docs] def init_from_spline_file(self, file, delimiter, status_callback=None):
line = string.strip(file.readline())
if line == "spp1":
self.scaled = True
elif line == 'spv1':
self.scaled = False
else:
raise ValueError("Unknown format: %s" % (line))
self.xx = []
self.xx_text = []
self.xx_is_categorical = False
self.conditionals = []
reader = csv.reader(file, delimiter=delimiter)
x = None
conditional = None
for row in reader:
if row[0] != x:
x = row[0]
conditional = SplineModelConditional()
self.add_conditional(x, conditional)
conditional.add_segment(float(row[1]), float(row[2]), map(float, row[3:]))
if status_callback:
status_callback("Parsing...", reader.line_num / (reader.line_num + 3.0))
if self.scaled:
for conditional in self.conditionals:
conditional.rescale()
return self
[docs] def to_ddp(self, ys=None):
if ys is None:
(limits, ys) = SplineModelConditional.propose_grid(self.conditionals)
pp = np.ones((len(self.xx), len(ys)))
for ii in range(len(self.xx)):
pp[ii,] = self.to_points_at(self.xx[ii], ys)
return DDPModel('ddp1', 'spline_model', self.xx_is_categorical, self.get_xx(), False, ys, pp, scaled=self.scaled)
### Memoizable
[docs] def eval_pval_index(self, ii, p, threshold=1e-3):
return self.conditionals[ii].get_pval(p, threshold)
### Class Methods
[docs] @staticmethod
def create_single(xxs, y0s, y1s, coeffss, order=None, xx_is_categorical=True):
conditionals = []
xx = []
for key in (xxs if order is None else order):
xx.append(key)
conditional = SplineModelConditional.make_single(y0s[key], y1s[key], coeffss[key])
conditionals.append(conditional)
return SplineModel(xx_is_categorical, xx, conditionals, True)
[docs] @staticmethod
def create_gaussian(xxs, order=None, xx_is_categorical=True):
"""xxs should be a dictionary of the form {x: (mean, variance)}."""
conditionals = []
xx = []
for key in (xxs if order is None else order):
xx.append(key)
mean = float(xxs[key][0])
var = float(xxs[key][1])
conditional = SplineModelConditional.make_gaussian(SplineModel.neginf, SplineModel.posinf, mean, var)
conditionals.append(conditional)
return SplineModel(xx_is_categorical, xx, conditionals, True)
[docs] @staticmethod
def from_ddp(ddp_model, limits):
lps = ddp_model.log_p()
conditionals = []
xx = []
for ii in range(len(ddp_model.xx)):
lp = lps[ii,]
updown = np.concatenate((np.linspace(-1000, -900, np.floor(len(lp)/2)), np.linspace(-900, -1000, np.ceil(len(lp)/2))))
lp[lp == SplineModel.neginf] = updown[lp == SplineModel.neginf]
spline = UnivariateSpline(ddp_model.yy, lp, k=2)
try:
conditionals.append(SplineModelConditional.make_conditional_from_spline(spline, limits).rescale())
xx.append(ddp_model.get_xx()[ii])
except Exception as e:
print e
print traceback.print_exc()
return SplineModel(ddp_model.xx_is_categorical, xx, conditionals, True)
[docs] @staticmethod
def merge(models):
for model in models:
if not model.scaled:
raise ValueError("Only scaled distributions can be merged.")
(models, xx) = UnivariateModel.intersect_x_all(models)
model = SplineModel()
for ii in range(len(xx)):
conditional = SplineModelConditional()
y0 = SplineModel.neginf
# Loop through each segment
while y0 != SplineModel.posinf:
y1 = SplineModel.posinf
coeffs = np.zeros(3)
for jj in range(len(models)):
modcond = models[jj].get_conditional(xx[ii])
for kk in range(len(modcond.y0s)):
if modcond.y0s[kk] <= y0 and modcond.y1s[kk] > y0:
if modcond.y1s[kk] < y1:
y1 = modcond.y1s[kk]
if np.all(np.isfinite(modcond.coeffs[kk])): # Ignore NA and Inf
coeffs[0:len(modcond.coeffs[kk])] = np.array(coeffs[0:len(modcond.coeffs[kk])]) + np.array(modcond.coeffs[kk])
while len(coeffs) > 0 and coeffs[-1] == 0:
coeffs = coeffs[0:-1]
conditional.add_segment(y0, y1, coeffs)
y0 = y1
model.add_conditional(xx[ii], conditional.rescale())
return model
[docs] @staticmethod
def combine(one, two):
if one.xx_is_categorical != two.xx_is_categorical:
raise ValueError("Cannot combine models that do not agree on categoricity")
if not one.scaled or not two.scaled:
raise ValueError("Cannot combine unscaled models")
(one, two, xx) = UnivariateModel.intersect_x(one, two)
conditionals = []
for ii in range(len(xx)):
conditionals.append(one.get_conditional(xx[ii]).convolve(two.get_conditional(xx[ii])).rescale())
return SplineModel(one.xx_is_categorical, xx, conditionals, True)
[docs]class SplineModelConditional():
# coeffs is ordered low-order to high-order
def __init__(self, y0s=None, y1s=None, coeffs=None):
if y0s is None:
self.y0s = np.array([])
self.y1s = np.array([])
self.coeffs = []
else:
self.y0s = np.array(y0s)
self.y1s = np.array(y1s)
self.coeffs = coeffs
[docs] def size(self):
return len(self.y0s)
[docs] def copy(self):
return SplineModelConditional(list(self.y0s), list(self.y1s), list(self.coeffs))
# Does not maintain scaling
[docs] def scale_y(self, a):
for ii in range(self.size()):
self.y0s[ii] *= a
self.y1s[ii] *= a
if len(self.coeffs[ii]) > 1:
self.coeffs[ii][1] /= a
if len(self.coeffs[ii]) > 2:
self.coeffs[ii][2] /= a*a
# Does not maintain scaling
[docs] def scale_p(self, a):
for ii in range(self.size()):
self.coeffs[ii] = map(lambda c: a*c, self.coeffs[ii])
# Does not check for overlapping segments
[docs] def add_segment(self, y0, y1, coeffs):
self.y0s = np.append(self.y0s, [y0])
self.y1s = np.append(self.y1s, [y1])
self.coeffs.append(coeffs)
indexes = np.argsort(self.y0s)
if indexes[-1] != len(self.y0s) - 1:
self.y0s = self.y0s[indexes]
self.y1s = self.y1s[indexes]
self.coeffs = [self.coeffs[index] for index in indexes]
# Note: after calling, need to set scaled on SplineModel object
[docs] def rescale(self):
integral = self.cdf(SplineModel.posinf)
if not np.isnan(integral) and integral > 0:
self.scale(1 / integral)
return self
[docs] def scale(self, factor):
if factor == 0:
self.y0s = [SplineModel.neginf]
self.y1s = [SplineModel.posinf]
self.coeffs = [[SplineModel.neginf]]
else:
for ii in range(self.size()):
self.coeffs[ii][0] = self.coeffs[ii][0] + math.log(factor)
# Similar to to_points
[docs] def evaluate(self, ii, y):
if y == SplineModel.neginf or y == SplineModel.posinf:
return 0
return np.exp(np.polyval(self.coeffs[ii][::-1], y))
# Similar to evaluate
[docs] def to_points(self, ys):
result = np.array(ys) * 0
for ii in range(len(self.y0s)):
valid = np.logical_and(ys >= self.y0s[ii], ys <= self.y1s[ii])
result[valid] = np.exp(np.polyval(self.coeffs[ii][::-1], ys[valid]))
return result
[docs] def partial_cdf(self, ii, y1):
if len(self.coeffs[ii]) == 0:
return np.nan
if len(self.coeffs[ii]) == 1:
if self.coeffs[ii][0] == SplineModel.neginf:
return 0
return np.exp(self.coeffs[ii][0]) * (y1 - self.y0s[ii])
elif len(self.coeffs[ii]) == 2:
return (np.exp(self.coeffs[ii][0]) / self.coeffs[ii][1]) * (np.exp(self.coeffs[ii][1] * y1) - np.exp(self.coeffs[ii][1] * self.y0s[ii]))
elif self.coeffs[ii][2] > 0:
if self.y0s[ii] == SplineModel.neginf or self.y1s[ii] == SplineModel.posinf:
raise ValueError("Improper area of spline")
myys = np.linspace(self.y0s[ii], y1, SplineModel.samples)
return sum(np.exp(np.polyval(self.coeffs[ii][::-1], myys))) * (y1 - self.y0s[ii]) / SplineModel.samples
else:
var = -.5 / self.coeffs[ii][2]
mean = self.coeffs[ii][1] * var
if np.isnan(mean) or np.isnan(var) or var <= 0:
return 0
exponent = self.coeffs[ii][0] - (-mean*mean / (2*var) + math.log(1 / math.sqrt(2*math.pi*var)))
if exponent > 100:
# math domain error!
return 0
rescale = math.exp(exponent)
below = 0
if float(self.y0s[ii]) != SplineModel.neginf:
below = norm.cdf(float(self.y0s[ii]), loc=mean, scale=math.sqrt(var))
if exponent > 20 and float(self.y0s[ii]) != SplineModel.neginf and float(self.y1s[ii]) != SplineModel.neginf and y1 != SplineModel.posinf:
# approaching math domain error: assume constant
total = rescale * (norm.cdf(self.y1s[ii], loc=mean, scale=math.sqrt(var)) - below)
return total * (y1 - self.y0s[ii]) / (self.y1s[ii] - self.y0s[ii])
return rescale * (norm.cdf(y1, loc=mean, scale=math.sqrt(var)) - below)
[docs] def cdf(self, yy):
integral = 0
for ii in range(len(self.y0s)):
if self.y1s[ii] >= yy:
y1 = yy
else:
y1 = self.y1s[ii]
integral += self.partial_cdf(ii, y1)
if self.y1s[ii] >= yy:
break
return integral
[docs] def draw_sample(self):
value = random.random()
return self.get_pval(value)
[docs] def get_pval(self, p, threshold=1e-3):
# Use finer thresholds later on
if p < .1:
threshold = threshold * p * 10
elif p > .9:
threshold = threshold * (1 - p) * 10
# First figure out which spline p is in
integral = 0
for ii in range(len(self.y0s)):
if ii == len(self.y0s) - 1:
break # this will bring us to 1
partial = self.partial_cdf(ii, self.y1s[ii])
if integral + partial > p:
break
integral += partial
y = SplineModelConditional.ascinv(p - integral, lambda y: self.partial_cdf(ii, y), self.y0s[ii], self.y1s[ii], threshold)
if np.isnan(y):
# Let's just give back some value
if self.y0s[0] < 0 and self.y1s[len(self.y1s)-1] > 0:
y = 0
else:
y = (self.y0s[0] + self.y1s[len(self.y1s)-1]) / 2
return y
# find the x for a given y of an ascending function
# copied from math.js
[docs] @staticmethod
def ascinv(y, func, minx, maxx, threshold):
tries = 0
while tries < 10000:
tries += 1
if (minx == SplineModel.neginf and maxx == SplineModel.posinf) or (minx == SplineModel.neginf and maxx > 0) or (minx < 0 and maxx == SplineModel.posinf):
midpoint = 0
elif minx == SplineModel.neginf:
midpoint = (maxx - 1.0) * 2
elif maxx == SplineModel.posinf:
midpoint = (minx + 1.0) * 2
else:
midpoint = (minx + maxx) / 2.0
error = func(midpoint) - y
if abs(error) < threshold:
return midpoint
elif np.isnan(error):
return np.nan
elif error > 0:
maxx = midpoint
elif error < 0:
minx = midpoint
return np.nan
[docs] def approximate_mean(self, limits):
rough_limits = self.rough_limits()
limits = (max(float(limits[0]), rough_limits[0]), min(float(limits[1]), rough_limits[1]))
ys = np.linspace(limits[0], limits[1], self.size() * SplineModel.samples)
ps = self.to_points(ys)
ps = ps / sum(ps)
return sum(ps * ys)
# Allow true gaussian or delta
[docs] def is_gaussian(self):
return len(self.y0s) == 1 and (len(self.coeffs[0]) == 3 or len(self.coeffs[0]) == 0)
[docs] def gaussian_sdev(self, ii):
if len(self.coeffs[ii]) == 0:
return 0
if self.coeffs[ii][2] == 0:
return np.inf
return 1/math.sqrt(-2*self.coeffs[ii][2])
[docs] def gaussian_mean(self, ii):
if len(self.coeffs[ii]) == 0:
return (self.y1s[ii] + self.y0s[ii]) / 2
if self.coeffs[ii][2] == 0:
return np.nan
return -self.coeffs[ii][1] / (2*self.coeffs[ii][2])
[docs] def nongaussian_xpx(self, ii):
a = self.coeffs[ii][2] if len(self.coeffs[ii]) > 2 else 0
b = self.coeffs[ii][1] if len(self.coeffs[ii]) > 1 else 0
c = self.coeffs[ii][0]
x0 = self.y0s[ii]
x1 = self.y1s[ii]
# From Matlab
if a == 0:
if x0 == SplineModel.neginf:
return (math.exp(c + b*x1)*(b*x1 - 1))/b**2
elif x1 == SplineModel.posinf:
return -(math.exp(c + b*x0)*(b*x0 - 1))/b**2
return (math.exp(c + b*x1)*(b*x1 - 1))/b**2 - (math.exp(c + b*x0)*(b*x0 - 1))/b**2
sqrtpi = math.pi**.5
na05 = ((-a)**.5)
na15 = ((-a)**1.5)
return (math.exp(a*x1**2 + b*x1)*math.exp(c))/(2*a) - (math.exp(a*x0**2 + b*x0)*math.exp(c))/(2*a) + (sqrtpi*b*math.exp(-b**2/(4*a))*math.exp(c)*erf((b + 2*a*x0)/(2*na05)))/(4*na15) - (sqrtpi*b*math.exp(-(b**2)/(4*a))*math.exp(c)*erf((b + 2*a*x1)/(2*na05)))/(4*na15)
[docs] def nongaussian_x2px(self, ii):
a = self.coeffs[ii][2] if len(self.coeffs[ii]) > 2 else 0
b = self.coeffs[ii][1] if len(self.coeffs[ii]) > 1 else 0
c = self.coeffs[ii][0]
x0 = self.y0s[ii]
x1 = self.y1s[ii]
# From Matlab
if a == 0:
if x0 == SplineModel.neginf:
return (math.exp(c + b*x1)*(b**2*x1**2 - 2*b*x1 + 2))/b**3
elif x1 == SplineModel.posinf:
return -(math.exp(c + b*x0)*(b**2*x0**2 - 2*b*x0 + 2))/b**3
return (math.exp(c + b*x1)*(b**2*x1**2 - 2*b*x1 + 2))/b**3 - (math.exp(c + b*x0)*(b**2*x0**2 - 2*b*x0 + 2))/b**3
sqrtpi = math.pi**.5
na05 = ((-a)**.5)
na25 = ((-a)**2.5)
na35 = ((-a)**3.5)
return (2*na25*b*math.exp(a*x0**2 + b*x0 + c) - 2*na25*b*math.exp(a*x1**2 + b*x1 + c) + 4*na35*x0*math.exp(a*x0**2 + b*x0 + c) - 4*na35*x1*math.exp(a*x1**2 + b*x1 + c) - 2*(sqrtpi)*a**3*math.exp((- b**2 + 4*a*c)/(4*a))*erf((b + 2*a*x0)/(2*na05)) + 2*(sqrtpi)*a**3*math.exp((- b**2 + 4*a*c)/(4*a))*erf((b + 2*a*x1)/(2*na05)) + (sqrtpi)*a**2*b**2*math.exp((- b**2 + 4*a*c)/(4*a))*erf((b + 2*a*x0)/(2*na05)) - (sqrtpi)*a**2*b**2*math.exp((- b**2 + 4*a*c)/(4*a))*erf((b + 2*a*x1)/(2*na05)))/(8*((-a)**4.5))
# Duplicated in models.js
[docs] def segment_max(self, jj):
maxyy = self.y0s[jj]
maxval = self.evaluate(jj, self.y0s[jj])
val = self.evaluate(jj, self.y1s[jj])
if (val > maxval):
maxval = val
maxyy = self.y1s[jj]
coeffs = self.coeffs[jj]
if len(coeffs) > 2:
yy = -coeffs[1] / (2*coeffs[2])
if yy > self.y0s[jj] and yy < self.y1s[jj]:
val = self.evaluate(jj, yy)
if val > maxval:
maxval = val
maxyy = yy
return (maxyy, maxval)
# Duplicated in models.js
# returns (yy, val)
[docs] def find_mode(self):
maxmax = (None, SplineModel.neginf)
for ii in range(self.size()):
mymax = self.segment_max(ii)
if mymax[1] > maxmax[1]:
maxmax = mymax
return maxmax
# Duplicated in models.js
[docs] def rough_span(self):
span = 0
for jj in range(self.size()):
if self.y0s[jj] == SplineModel.neginf or self.y1s[jj] == SplineModel.posinf:
if len(self.coeffs[jj]) == 3:
span += 3 / math.sqrt(abs(2*self.coeffs[jj][2]))
elif len(self.coeffs[jj]) == 2:
span += 5 / abs(self.coeffs[jj][1])
else:
span += 1 / abs(self.coeffs[jj][0]) # improper!
else:
span += self.y1s[jj] - self.y0s[jj]
return span
# Duplicated in models.js
[docs] def rough_limits(self):
limits0 = float(self.y0s[0])
limits1 = float(self.y1s[-1])
if limits0 == SplineModel.neginf or limits1 == SplineModel.posinf:
maxmax = self.find_mode()
span = self.rough_span()
if limits0 == SplineModel.neginf:
limits0 = maxmax[0] - span
if limits1 == SplineModel.posinf:
limits1 = maxmax[0] + span
return (limits0, limits1)
[docs] def convolve(self, other):
# NOTE: below is for a single segment...
# int_s e^P(s) e^Q(t - s) = int_s e^[P(s) + Q(t - s)] = int_s e^[a1 ss + b1 s + c1 + a2 (tt - 2ts + ss) + b2 t - b2 s + c2]
# int_s e^[(a1 + a2) ss + (b1 - 2t - b2) s] e^[a2 (tt) + b2 t + c1 + c2]
# Have to do approximate sum later anyway, so let's just convert to ddp
(limits, ys) = SplineModelConditional.propose_grid([self, other])
pp_self = self.to_points(ys)
pp_other = other.to_points(ys)
newpp = np.convolve(pp_self, pp_other)
newpp = newpp / sum(newpp) # Scale
yy = np.linspace(2*min(ys), 2*max(ys), 2*len(ys) - 1)
if np.any(newpp == 0):
conditional = SplineModelConditional()
# Break into many pieces
ii = 0
y0 = min(yy)
while ii == 0 or (ii < len(newpp) and newpp[ii] == 0):
if newpp[ii] == 0:
while ii < len(newpp) and newpp[ii] == 0:
ii += 1
if ii < len(newpp):
conditional.add_segment(y0, yy[ii], [SplineModel.neginf])
else:
conditional.add_segment(y0, yy[-1], [SplineModel.neginf])
break
y0 = yy[ii]
i0 = ii
while ii < len(newpp) and newpp[ii] > 0:
ii += 1
spline = UnivariateSpline(yy[i0:ii], np.log(newpp[i0:ii]), k=2, s=(ii - i0) / 1000.0)
if ii < len(newpp):
segments = SplineModelConditional.make_conditional_from_spline(spline, (y0, yy[ii]))
else:
segments = SplineModelConditional.make_conditional_from_spline(spline, (y0, yy[-1]))
for jj in range(segments.size()):
conditional.add_segment(segments.y0s[jj], segments.y1s[jj], segments.coeffs[jj])
if ii < len(newpp):
y0 = yy[ii]
else:
break
return conditional
else:
spline = UnivariateSpline(yy, np.log(newpp), k=2)
return SplineModelConditional.make_conditional_from_spline(spline, (2*limits[0], 2*limits[1]))
[docs] @staticmethod
def make_single(y0, y1, coeffs):
return SplineModelConditional(y0s=[y0], y1s=[y1], coeffs=[coeffs])
[docs] @staticmethod
def make_gaussian(y0, y1, mean, var):
return SplineModelConditional.make_single(y0, y1, [-mean*mean/(2*var) - np.log(np.sqrt(2*math.pi*var)), mean/var, -1/(2*var)])
[docs] @staticmethod
def make_conditional_from_spline(spline, limits):
conditional = SplineModelConditional()
knots = spline.get_knots()
midpoint = (knots[-1] + knots[1]) / 2
knots = sorted(knots)
knots[0] = float(limits[0])
knots[-1] = float(limits[1])
for ii in range(1, len(knots)):
if knots[ii-1] == SplineModel.neginf and knots[ii] == SplineModel.posinf:
y = midpoint
elif knots[ii-1] == SplineModel.neginf:
y = knots[ii]
elif knots[ii] == SplineModel.posinf:
y = knots[ii-1]
else:
y = (knots[ii-1] + knots[ii]) / 2
derivs = spline.derivatives(y)
a = derivs[2] / 2
b = derivs[1] - derivs[2] * y
c = derivs[0] - (a*y*y + b*y)
if a > 0 and (knots[ii-1] == SplineModel.neginf or knots[ii] == SplineModel.posinf):
conditional.add_segment(knots[ii-1], knots[ii], [SplineModel.neginf]) # This segment failed!
else:
conditional.add_segment(knots[ii-1], knots[ii], [c, b, a])
return conditional
[docs] @staticmethod
def find_nearest(array, value, within):
if isinstance(value, str) or isinstance(value, unicode):
try:
value = int(value)
except:
raise ValueError("Cannot apply find_nearest to categorical values.")
idx = (np.abs(np.array(array)-value)).argmin()
return within[idx]
[docs] @staticmethod
def approximate_sum(conditionals):
if len(conditionals) == 1:
return conditionals[0]
(limits, ys) = SplineModelConditional.propose_grid(conditionals)
ps = np.zeros(len(ys))
for ii in range(len(conditionals)):
ps = ps + conditionals[ii].to_points(ys)
lps = np.log(ps)
spline = UnivariateSpline(ys, lps, k=2)
return SplineModelConditional.make_conditional_from_spline(spline, limits)
[docs] @staticmethod
def propose_grid(conditionals):
limits = (SplineModel.neginf, SplineModel.posinf)
rough_limits = (SplineModel.posinf, SplineModel.neginf)
max_segments = 0
for conditional in conditionals:
if conditional.y0s[0] == conditional.y1s[-1] or np.isnan(conditional.y0s[0]) or np.isnan(conditional.y1s[-1]):
continue
limits = (max(limits[0], conditional.y0s[0]), min(limits[1], conditional.y1s[-1]))
conditional_rough_limits = conditional.rough_limits()
rough_limits = (min(rough_limits[0], conditional_rough_limits[0]), max(rough_limits[1], conditional_rough_limits[1]))
max_segments = max(max_segments, sum(map(lambda cc: len(cc), conditional.coeffs)))
num_points = 100 * max_segments / (1 + np.log(len(conditionals)))
ys = np.linspace(rough_limits[0], rough_limits[1], num_points)
return (limits, ys)
from ddp_model import DDPModel
Model.mergers["spline_model"] = SplineModel.merge
Model.mergers["spline_model+ddp_model"] = lambda models: DDPModel.merge(map(lambda m: m.to_ddp(), models))
Model.combiners['spline_model+spline_model'] = SplineModel.combine
Model.combiners["spline_model+ddp_model"] = lambda one, two: DDPModel.combine(one.to_ddp(), two)