Source code for openest.models.ddp_model

# -*- coding: utf-8 -*-
################################################################################
# Copyright 2014, The Open Aggregator
#   GNU General Public License, Ver. 3 (see docs/license.txt)
################################################################################


__copyright__ = "Copyright 2014, The Open Aggregator"
__license__ = "GPL"

__author__ = "James Rising"
__credits__ = ["James Rising", "Solomon Hsiang"]
__maintainer__ = "James Rising"
__email__ = "jar2234@columbia.edu"

__status__ = "Production"
__version__ = "$Revision$"
# $Source$

import csv, random
from numpy import *
from scipy.interpolate import interp1d

from model import Model
from univariate_model import UnivariateModel
from memoizable import MemoizableUnivariate

[docs]class DDPModel(UnivariateModel, MemoizableUnivariate): ''' Discrete-Discrete-Probability (DDP) Format A DDP file describes a dose-response relationship with a limited collection of response outcomes. The dose and response values may be either categorical or sampled at a collection of numerical levels. ``<y-value-1>``, ..., ``<y-value-N>`` and ``<x-value-1>``, ..., ``<x-value-N>`` are either strings (for named categories) or numerical values. The format of a DDP file is:: <format>,<y-value-1>,<y-value-2>,... <x-value-1>,p(y1|x1),p(y2|x1),... <x-value-2>,p(y1|x2),p(y2|x2),... Below is a sample categorical DDP file:: ddp1,live,dead control,.5,.5 treated,.9,.1 Below is a sample numerical DDP file:: ddp1,-10.0,-.33333333333,3.33333333333,10.0 0.0,0.5,0.5,0.0,0.0 13.3333333333,0.0,0.5,0.5,0.0 26.6666666667,0.0,0.0,0.5,0.5 40.0,0.0,0.0,0.0,0.5 Parameters ---------- p_format : str Probability format. May be one of the following values: * ``ddp1`` - the p(.) values are simple probabilities (0 < p(.) < 1 and sum p(y|x) = 1) * ``ddp2`` - the p(.) values are log probabilities source : str Metadata attribute. Name of file this object was read in from. xx_is_categorical : bool Indicates whether ``xx`` is categorical. False indicates numeric data. xx : list-like X axis index yy_is_categorical : bool Indicates whether ``yy`` is categorical. False indicates numeric data. yy : list-like Y axis index pp : array-like underlying numpy(?) data array unaccounted : numpy.array column of remaining probability. ``unaccounted = 1-sum(pp, axis=1)``. scaled : bool Indicates whether data has been scaled. If scaled, re-scale so ``pp.sum(axis=1)==1``. ''' def __init__(self, p_format=None, source=None, xx_is_categorical=False, xx=None, yy_is_categorical=False, yy=None, pp=None, unaccounted=None, scaled=True): super(DDPModel, self).__init__(xx_is_categorical, xx, scaled) self.p_format = p_format self.source = source self.yy_is_categorical = yy_is_categorical if yy_is_categorical: self.yy = range(len(yy)) self.yy_text = yy elif yy is not None: self.yy = yy self.yy_text = map(str, yy) self.pp = pp self.unaccounted = unaccounted def __repr__(self): ''' string representation ''' if self.source is None: return "DDP model" else: return "DDP model from " + str(self.source)
[docs] def kind(self): ''' returns model type ("ddp_model") ''' return 'ddp_model'
[docs] def copy(self): ''' copy data and return DDPModel with the same data ''' # Can't use python's copy since could be strange object from ming <-- note: could implement __copy__ return DDPModel(self.p_format, getattr(self, 'source', 'external'), self.xx_is_categorical, list(self.get_xx()), self.yy_is_categorical, list(self.get_yy()), array(self.pp), unaccounted=getattr(self, 'unaccounted', 0), scaled=self.scaled)
[docs] def get_xx(self): ''' returns x axis index ''' if self.xx_is_categorical: return self.xx_text else: return self.xx
[docs] def get_yy(self): ''' returns x axis index ''' if self.yy_is_categorical: return self.yy_text else: return self.yy
[docs] def rescale(self, as_ddp=True): ''' Can rescale non-ddp (that is, as sampling of continuous distribution) ''' if as_ddp or self.yy_is_categorical: newpp = self.lin_p() for ii in range(len(self.xx)): newpp[ii,] = newpp[ii,] / sum(newpp[ii,]) else: sorts = sorted(self.get_yy()) if len(sorts) > 0: newpp = self.lin_p() for ii in range(len(self.xx)): cdf = 0 cdf += (sorts[1] - sorts[0]) * newpp[ii, 0] for jj in range(1, len(sorts)-1): cdf += (sorts[jj+1] - sorts[jj-1]) * newpp[ii, jj] / 2 cdf += (sorts[-1] - sorts[-2]) * newpp[ii, -1] newpp[ii,] = newpp[ii,] / float(cdf) self.pp = newpp self.p_format = 'ddp1' self.scaled = as_ddp or self.yy_is_categorical return self
[docs] def eval_pval(self, x, p, threshold=1e-3): return self.eval_pval_index(self.get_closest(x), p, threshold)
[docs] def scale_y(self, a): ''' multiply index y (numeric only) by scale factor a ''' if self.yy_is_categorical: raise ValueError("Cannot scale on a categorical y") self.yy = [y * a for y in self.yy] return self
[docs] def scale_p(self, a): ''' coerce to ddp2 (log probability) format and scale by a ''' self.pp = a * self.log_p() self.p_format = 'ddp2' return self.rescaled()
[docs] def add_to_y(self, a): ''' add value a to each element of index y (numeric only) ''' if self.yy_is_categorical: raise ValueError("Cannot add to a categorical y") self.yy = [y + a for y in self.yy] return self
[docs] def transpose(self): ''' transpose data structure ''' other = self.copy() other.pp = transpose(other.pp) other.xx_is_categorical = self.yy_is_categorical other.xx = list(self.yy) other.xx_text = self.yy_text other.yy_is_categorical = self.xx_is_categorical other.yy = list(self.xx) other.yy_text = self.xx_text return other
[docs] def write_file(self, filename, delimiter): ''' write CSV to file path ''' with open(filename, 'w') as fp: self.write(fp, delimiter)
[docs] def write(self, file, delimiter): ''' write CSV to file object ''' writer = csv.writer(file, delimiter=delimiter) if self.scaled: header = [self.p_format] elif self.p_format == 'ddp1': header = ['ddv1'] elif self.p_format == 'ddp2': header = ['ddv2'] if self.yy_is_categorical: header.extend(self.yy_text) else: header.extend(self.yy) writer.writerow(header) for ii in range(len(self.xx)): if self.xx_is_categorical: row = [self.xx_text[ii]] row.extend(self.pp[ii,]) writer.writerow(row) else: row = [self.xx[ii]] row.extend(self.pp[ii,]) writer.writerow(row)
[docs] def lin_p(self): ''' convert any DDPModel to ddp1 (linear probability) format ''' if self.p_format == 'ddp1': return self.pp elif self.p_format == 'ddp2': return exp(self.pp) else: return NotImplementedError("Unknown format in lin_p: " + self.p_format)
[docs] def log_p(self): ''' convert any DDPModel to ddp2 (log probability) format ''' if self.p_format == 'ddp1': pp = ones((len(self.xx), len(self.yy))) * float('-inf') pp[self.pp > 0] = log(self.pp[self.pp > 0]) return pp elif self.p_format == 'ddp2': return self.pp else: return NotImplementedError("Unknown format in log_p: " + self.p_format)
[docs] def filter_x(self, xx): ''' Slice DDPModel data such that the values of the x index == xx ''' newpp = ones((len(xx), len(self.yy))) for ii in range(len(xx)): newpp[ii,] = self.pp[self.get_xx() == xx,] return DDPModel(self.p_format, 'filter_x', self.xx_is_categorical, xx, self.yy_is_categorical, self.get_yy(), newpp, scaled=self.scaled)
[docs] def interpolate_x(self, newxx, kind='quadratic'): ''' custom interpolation method. wrapper around scipy.interp1d. Parameters ---------- newxx : list-like new x axis kind : str interpolation method, passed to scipy.interp1d ''' newpp = zeros((len(newxx), len(self.yy))) # Interpolate for each y pp = self.lin_p() xx = self.xx if min(newxx) < min(xx): xx = concatenate(([min(newxx)], xx)) pp = vstack((zeros((1, len(self.yy))), pp)) if max(newxx) > max(xx): xx = concatenate((xx, [max(newxx)])) pp = vstack((pp, zeros((1, len(self.yy))))) for jj in range(len(self.yy)): fx = interp1d(xx, pp[:,jj], kind) newpp[:,jj] = fx(newxx) # Rescale if self.scaled: for ii in range(len(newxx)): newpp[ii,] = newpp[ii,] / sum(newpp[ii,]) return DDPModel('ddp1', 'interpolate_x', False, newxx, False, self.yy, newpp, scaled=self.scaled)
# Only for categorical models
[docs] def recategorize_x(self, oldxx, newxx): newpp = zeros((len(newxx), len(self.yy))) pp = self.lin_p() for ii in range(len(oldxx)): newpp[ii,] = self.pp[self.get_xx() == oldxx[ii],] return DDPModel('ddp1', 'recategorize_x', True, newxx, self.yy_is_categorical, self.yy, newpp, scaled=self.scaled)
[docs] def interpolate_y(self, newyy, kind='quadratic'): ''' custom interpolation method. wrapper around scipy.interp1d. Parameters ---------- newyy : list-like new y axis kind : str interpolation method, passed to scipy.interp1d ''' newpp = zeros((len(self.xx), len(newyy))) # Interpolate for each y pp = self.lin_p() for ii in range(len(self.xx)): if len(self.yy) == 2: fx = interp1d(self.yy, pp[ii,:], 'linear') else: fx = interp1d(self.yy, pp[ii,:], kind) newpp[ii,:] = fx(newyy) if self.scaled: newpp[ii,] = newpp[ii,] / sum(newpp[ii,]) return DDPModel('ddp1', 'interpolate_y', self.xx_is_categorical, self.get_xx(), False, newyy, newpp, scaled=self.scaled)
[docs] def get_closest(self, x=None): ''' return closest index on x axis If x index is categorical, coerce x to string and find first matching index. If numeric, find the closest value. If x is None (default), return 0 ''' if x is None: return 0 try: return self.xx_text.index(str(x)) except: idx = (abs(array(self.xx)-x)).argmin() return idx
[docs] def get_mean(self, x=None): ''' Returns the mean of the y-index labels weighted by p values in row x If x is None (default), use first row. Uses self.get_closest(x) to find matching nearest match for x-index label x ''' if not self.scaled: raise ValueError("Cannot take mean of unscaled distribution.") ps = self.lin_p()[self.get_closest(x), :] return sum(ps * self.yy)
[docs] def get_sdev(self, x=None): ''' Returns the std dev of the y-index labels weighted by p values in row x If x is None (default), use first row. Uses self.get_closest(x) to find matching nearest match for x-index label x ''' if not self.scaled: raise ValueError("Cannot take sdev of unscaled distribution.") ps = self.lin_p()[self.get_closest(x), :] mean = sum(ps * self.yy) vari = sum(ps * square(self.yy - mean)) return sqrt(vari)
[docs] def draw_sample(self, x=None): ''' Randomly sample label from y-index using p values in row x If x is None (default), use first row. Uses self.get_closest(x) to find matching nearest match for x-index label x ''' if not self.scaled: raise ValueError("Cannot draw sample from unscaled distribution.") ps = self.lin_p()[self.get_closest(x), :] value = random.random() total = 0 for ii in range(len(ps)): total += ps[ii] if total > value: return self.yy[ii] return self.yy[-1]
[docs] def init_from(self, file, delimiter, status_callback=None, source=None): ''' Read DDP data set from file ''' reader = csv.reader(file, delimiter=delimiter) header = reader.next() fmt = header[0] if fmt not in ['ddp1', 'ddp2', 'ddv1', 'ddv2']: raise ValueError("Unknown format: %s" % (fmt)) self.source = source if fmt == 'ddp1' or fmt == 'ddp2': self.p_format = fmt self.scaled = True elif fmt == 'ddv1': self.p_format = 'ddp1' self.scaled = False elif fmt == 'ddv2': self.p_format = 'ddp2' self.scaled = False yy_text = header[1:] yy = [] self.yy_is_categorical = False for jj in range(len(yy_text)): try: yy.append(float(yy_text[jj])) except ValueError: yy.append(jj) self.yy_is_categorical = True pp = None xx_text = [] xx = [] self.xx_is_categorical = False for row in reader: if pp is None: pp = array([map(float, row[1:])]) else: pp = vstack((pp, map(float, row[1:]))) xx_text.append(row[0]) try: xx.append(float(row[0])) except ValueError: xx.append(len(xx)) self.xx_is_categorical = True if status_callback: status_callback("Parsing...", reader.line_num / (reader.line_num + 3.0)) self.yy = yy self.yy_text = yy_text self.xx = list(xx) self.xx_text = xx_text self.pp = pp if self.scaled == True: print pp if self.p_format == 'ddp1': sums = sum(pp, axis=1) else: sums = sum(exp(pp), axis=1) if any(sums < .95): raise ValueError("Some columns sum to less than .95") self.unaccounted = 1 - sums return self
[docs] def init_from_other(self, ddp): ''' copy attributes of other DDP dataset to this one ''' self.p_format = ddp.p_format self.source = ddp.source self.xx_is_categorical = ddp.xx_is_categorical self.xx = list(ddp.xx) self.xx_text = ddp.xx_text self.yy_is_categorical = ddp.yy_is_categorical self.yy = list(ddp.yy) self.yy_text = ddp.yy_text self.pp = ddp.pp self.unaccounted = ddp.unaccounted self.scaled = ddp.scaled
[docs] def to_ddp(self, ys=None): ''' coerce to DDP, interpolating along y axis if necessary ''' if ys is None: return self.copy() return self.interpolate_y(ys)
### Memoizable
[docs] def eval_pval_index(self, ii, p, threshold=1e-3): ps = self.lin_p()[ii, :] value = p * sum(ps) total = 0 for ii in range(len(ps)): total += ps[ii] if total > value: return self.yy[ii] return self.yy[-1]
### Class methods
[docs] @staticmethod def from_file(filename, delimiter): ''' read DDP file from file path ''' with open(filename) as fp: model = DDPModel() model.init_from(fp, delimiter) model.source = filename return model
[docs] @staticmethod def create_lin(yy, xxs): ''' Create a DDP model by supplying y index and dictionary of p-values Parameters ---------- yy : list-like y-index labels xxs : dict dictionary keyed with x-index values with p-values for vals ''' pp = None xx = [] for key in xxs: xx.append(key) if pp is None: pp = xxs[key] else: pp = vstack((pp, xxs[key])) return DDPModel('ddp1', 'create_lin', True, xx, False, array(yy), pp)
[docs] @staticmethod def merge(models): # Decide on master x values xx = [] yy = [] for model in models: if not isinstance(model, DDPModel): raise ValueError('Merge only handles ddp models') if not model.scaled: raise ValueError("Only scaled distributions can be merged.") xx = concatenate((xx, model.xx)) yy = concatenate((yy, model.yy)) xx = unique(xx) yy = unique(yy) # Bayesian combination of all models sumlp = zeros((len(xx), len(yy))) for model in models: if len(xx) != len(model.xx) or any(xx != model.xx): model = model.interpolate_x(xx) if len(yy) != len(model.yy) or any(yy != model.yy): model = model.interpolate_y(yy) sumlp = sumlp + model.log_p() # Rescale along each x prodp = exp(sumlp) sums = sum(prodp, axis=1) pp = empty((len(xx), len(yy))) for ii in range(len(xx)): pp[ii,] = prodp[ii,] / sums[ii] return DDPModel('ddp1', 'merge', False, xx, False, yy, pp)
[docs] @staticmethod def combine(one, two): if one.xx_is_categorical != two.xx_is_categorical: raise ValueError("Cannot combine models that do not agree on categoricity") if one.yy_is_categorical or two.yy_is_categorical: raise ValueError("Cannot combine categorical y models") if not one.scaled or not two.scaled: raise ValueError("Cannot combine unscaled models") (one, two, xx) = UnivariateModel.intersect_x(one, two) yy_one_min = min(one.yy) yy_one_max = max(one.yy) yy_two_min = min(two.yy) yy_two_max = max(two.yy) yy_step = min(median(diff(sort(one.yy))), median(diff(sort(two.yy)))) yy_one = arange(yy_one_min, yy_one_max, yy_step) if yy_one[-1] + yy_step == yy_one_max: yy_one = append(yy_one, [yy_one_max]) yy_two = arange(yy_two_min, yy_two_max, yy_step) if yy_two[-1] + yy_step == yy_two_max: yy_two = append(yy_two, [yy_two_max]) if not array_equal(yy_one, one.yy): one = one.interpolate_y(yy_one) if not array_equal(yy_two, two.yy): two = two.interpolate_y(yy_two) pp_one = one.lin_p() pp_two = two.lin_p() newpp = ones((len(xx), len(yy_one) + len(yy_two) - 1)) for ii in range(len(xx)): newpp[ii,] = convolve(pp_one[ii,], pp_two[ii,]) newpp[ii,] = newpp[ii,] / sum(newpp[ii,]) # Scale yy = append(arange(min(yy_one) + min(yy_two), max(yy_one) + max(yy_two), yy_step), [max(yy_one) + max(yy_two)]) return DDPModel('ddp1', 'combine', one.xx_is_categorical, xx, False, yy, newpp, scaled=True)
Model.mergers["ddp_model"] = DDPModel.merge Model.combiners['ddp_model+ddp_model'] = DDPModel.combine #ddp1 = DDPModel.from_file("../test/ddp1.csv", ',') #ddp2 = DDPModel.from_file("../test/ddp2.csv", ',') #merged = DDPModel.merge([ddp1, ddp2]) #merged.write("../test/merge.csv", ',') #ddp2 = DDPModel.from_file("../test/ddp2.csv", ',') #ddp1 = DDPModel('ddp1', 'test', False, [-5, 0, 7], False, ddp2.yy, ones((3, len(ddp2.yy)))) ##ddp1 = DDPModel('ddp1', 'test', False, ddp2.xx, False, ddp2.yy, ones((len(ddp2.xx), len(ddp2.yy)))) #merged = DDPModel.merge([ddp1, ddp2]) #merged.write("../test/merge.csv", ',')