Source code for openest.models.delta_model

from scipy.interpolate import interp1d
import numpy as np

from model import Model
from univariate_model import UnivariateModel

[docs]class DeltaModel(UnivariateModel): def __init__(self, xx_is_categorical=False, xx=None, locations=None, scale=1): super(DeltaModel, self).__init__(False, xx, scale == 1) self.scale = scale self.locations = locations
[docs] def kind(self): return 'delta_model'
[docs] def copy(self): return DeltaModel(self.xx_is_categorical, list(self.get_xx()), self.locations[:], scale=self.scale)
[docs] def scale_y(self, a): self.location = map(lambda loc: loc * a, self.locations) return self
[docs] def scale_p(self, a): """Raising a delta function to a power makes no difference.""" return self
[docs] def filter_x(self, xx): return DeltaModel(self.xx_is_categorical, xx, map(lambda x: self.locations[xx.index(x)], xx), scale=self.scale)
[docs] def interpolate_x(self, newxx, kind='quadratic'): fx = interp1d(self.xx, self.locations, kind) newlocs = fx(newxx) return DeltaModel(self.xx_is_categorical, newxx, newlocs, scale=self.scale)
[docs] def write_file(self, filename, delimiter): with open(filename, 'w') as fp: self.write(fp, delimiter)
[docs] def write(self, file, delimiter): file.write("del1" + delimiter + str(self.scale) + "\n") for ii in range(len(self.xx)): file.write(str(self.xx[ii]) + delimiter + str(self.locations[ii]) + "\n")
[docs] def to_points_at(self, x, ys): return map(lambda y: self.scale if y == self.locations[self.xx.index(x)] else 0, ys)
[docs] def draw_sample(self, x=None): return self.locations[self.xx.index(x)]
[docs] def cdf(self, xx, yy): location = self.locations[self.xx.index(xx)] if yy < location: return 0 else: return self.scale
[docs] def init_from_delta_file(self, file, delimiter, status_callback=None): reader = csv.reader(file, delimiter=delimiter) header = reader.next() if header[0] != "del1": raise ValueError("Unknown format: %s" % (fields[0])) self.scale = float(header[1]) self.locations = [] xx_text = [] xx = [] self.xx_is_categorical = False for row in reader: xx_text.append(row[0]) try: xx.append(float(row[0])) except ValueError: xx.append(len(xx)) self.xx_is_categorical = True self.locations.append(float(row[1])) self.xx = xx self.xx_text = xx_text
[docs] @staticmethod def merge(models): for model in models: if not model.scaled: raise ValueError("Only scaled distributions can be merged.") if isinstance(models[1], DeltaModel): # Check to see if all are equal-- otherwise error! for model in models[1:]: if len(model.locations) != len(models[0].locations): raise ValueError("Inconsident x-values in delta merge") for jj in range(len(model.locations)): if model.locations[jj] != models[0].locations[jj]: raise ValueError("Non-matching location in delta merge") # Either identical deltas, or only one delta return models[0]
[docs] @staticmethod def combine(one, two): if one.xx_is_categorical != two.xx_is_categorical: raise ValueError("Cannot combine models that do not agree on categoricity") if not one.scaled or not two.scaled: raise ValueError("Cannot combine unscaled models") if np.all(np.array(one.locations) == 0): return two if not isinstance(two, DeltaModel): raise ValueError("Combining non-zero delta with non-delta is not yet implemented.") (one, two, xx) = UnivariateModel.intersect_x(one, two) return DeltaModel(one.xx_is_categorical, xx, map(lambda ii: one.locations[ii] + two.locations[ii], range(len(xx))), 1)
[docs] @staticmethod def zero_delta(model): # The equivalent of scaling model to 0 return DeltaModel(model.xx_is_categorical, model.xx, np.zeros(len(model.xx)), 1)
Model.mergers["delta_model"] = DeltaModel.merge Model.mergers["delta_model+ddp_model"] = DeltaModel.merge Model.mergers["delta_model+spline_model"] = DeltaModel.merge Model.combiners['delta_model+delta_model'] = DeltaModel.combine Model.combiners['delta_model+ddp_model'] = DeltaModel.combine Model.combiners['delta_model+spline_model'] = DeltaModel.combine