Source code for openest.models.memoizable

import numpy as np
from univariate_model import UnivariateModel
from scipy.interpolate import UnivariateSpline

[docs]class MemoizableUnivariate(object): #def get_index(self, x): # raise NotImplementedError("get_index not implemented") # Only for a non-categorical
[docs] def get_edges(self): return (np.concatenate(([float('-inf')], self.get_xx())) + np.concatenate((self.get_xx(), [float('inf')]))) / 2
[docs] def eval_pval_index(self, ii, p, threshold=1e-3): raise NotImplementedError("eval_pval_index not implemented")
[docs]class MemoizedUnivariate(UnivariateModel): def __init__(self, model): super(MemoizedUnivariate, self).__init__(model.xx_is_categorical, model.xx, model.scaled) assert(isinstance(model, MemoizableUnivariate)) self.model = model # Only for non-categorical self.x_cache_multiple = None self.reset_cache()
[docs] def reset_cache(self): self.eval_pval_cache = {} # {ii => {p => result}} if self.model.xx_is_categorical: self.index_cache = {} else: if self.x_cache_multiple is not None: self.index_cache = {} else: self.index_cache = None self.edges = self.model.get_edges()
[docs] def set_x_cache_decimals(self, decimals): self.x_cache_multiple = float(10**decimals) self.reset_cache()
[docs] def get_index(self, x): if self.index_cache is None: return np.searchsorted(self.edges, x) - 1 elif self.model.xx_is_categorical: index = self.index_cache.get(x, None) if index is None: index = self.model.get_xx().index(str(x)) if x is not None else 0 self.index_cache[x] = index return index else: # Round to the given decimals xmul = int(x * self.x_cache_multiple) index = self.index_cache.get(xmul, None) if index is None: index = np.searchsorted(self.edges, xmul / self.x_cache_multiple) - 1 self.index_cache[xmul] = index return index
[docs] def get_indexes(self, xs): if self.index_cache is None: return np.searchsorted(self.edges, xs) - 1 elif self.model.xx_is_categorical: return [self.get_index(x) for x in xs] else: # Round to the given decimals xmuls = (xs * self.x_cache_multiple).astype(int) indexes = [] for xmul in xmuls: index = self.index_cache.get(xmul, None) if index is None: index = min(np.searchsorted(self.edges, xmul / self.x_cache_multiple) - 1, len(self.edges) - 2) self.index_cache[xmul] = index indexes.append(index) return indexes
[docs] def kind(self): return self.model.kind
[docs] def copy(self): return MemoizedUnivariate(self.model.copy())
[docs] def get_xx(self): return self.xx
[docs] def scale_y(self, a): self.model.scale_y(a) self.reset_cache() return self
[docs] def scale_p(self, a): self.model.scale_p(a) self.reset_cache() return self
[docs] def interpolate_x(self, newxx): return self.model.interpolate_x(newxx)
[docs] def write_file(self, filename, delimiter): self.model.write_file(filename, delimiter)
[docs] def write(self, file, delimiter): self.model.write(file, delimiter)
[docs] def eval_pval(self, x, p, threshold=1e-3): ii = self.get_index(x) cache = self.eval_pval_cache.get(ii, None) if cache is None: self.eval_pval_cache[ii] = {} cache = {} cached = cache.get(p, None) if cached is not None: return cached y = self.model.eval_pval_index(ii, p, threshold) cache[p] = y return y
[docs] def eval_pvals(self, xs, p, threshold=1e-3): iis = self.get_indexes(xs) ys = [] for ii in iis: cache = self.eval_pval_cache.get(ii, None) if cache is None: self.eval_pval_cache[ii] = {} cache = {} y = cache.get(p, None) if y is None: y = self.model.eval_pval_index(ii, p, threshold) cache[p] = y ys.append(y) return ys
[docs] def get_eval_pval_spline(self, p, limits, threshold=1e-3, linextrap=False): if len(self.get_xx()) == 1: xx = self.model.get_xx() yy = np.array(self.eval_pvals(xx, p, threshold)) xx = np.array(map(float, xx)) else: xx = np.array(self.model.get_xx()) yy = np.array(self.eval_pvals(xx, p, threshold)) yy[np.isnan(yy)] = 0 if not linextrap or len(xx) == 1: xx = np.concatenate(([limits[0]], xx, [limits[1]])) yy = np.concatenate(([yy[0]], yy, [yy[-1]])) spline = UnivariateSpline(xx, yy, s=0, k=1) return spline