Source code for openest.models.bin_model

# -*- coding: utf-8 -*-
################################################################################
# Copyright 2014, The Open Aggregator
#   GNU General Public License, Ver. 3 (see docs/license.txt)
################################################################################
__copyright__ = "Copyright 2014, The Open Aggregator"
__license__ = "GPL"

__author__ = "James Rising"
__credits__ = ["James Rising", "Amir Jina"]
__maintainer__ = "James Rising"
__email__ = "jar2234@columbia.edu"

__status__ = "Production"
__version__ = "$Revision$"
# $Source$

import csv, string
import numpy as np

from model import Model
from univariate_model import UnivariateModel
from memoizable import MemoizableUnivariate

[docs]class BinModel(UnivariateModel, MemoizableUnivariate): ''' Bin Model A bin model represents bins of different spans, where the distribution is constant over each bin. It is a combination of information describing the bins and an underlying categorical model of one of the other types. The underlying model is always categorical, with categories starting at 1. 0 is reserved for a future version that allows an out-of-sample distribution The format is:: bin1 <x0>,<x1>,<x2>, ... <underlying model> Parameters ---------- xx : list-like List-like array of bin edges. `len(xx)` should be one more than the number of bins. model : object Statistical model used in each bin ''' def __init__(self, xx=None, model=None): super(BinModel, self).__init__(False, xx, model.scaled if model is not None else False) self.model = model
[docs] def kind(self): ''' returns model type ("bin_model") ''' return 'bin_model'
[docs] def copy(self): ''' copy data and return BinModel with the same data ''' return BinModel(list(self.xx), self.model.copy())
[docs] def get_xx(self): ''' returns x axis index ''' return self.xx
[docs] def get_xx_centers(self): ''' returns x axis index ''' centers = (np.array(self.xx[:-1]) + np.array(self.xx[1:])) / 2 if centers[0] == -np.inf: centers[0] = self.xx[1] - 10 if centers[-1] == np.inf: centers[-1] = self.xx[-2] + 10 return centers
[docs] def scale_y(self, a): ''' Scales y-axes of underlying bin models Interface to `self.model.scale_y(a)` ''' self.model.scale_y(a) return self
[docs] def scale_p(self, a): ''' Scales p-values of underlying bin models (in log_p format) Interface to `self.model.scale_p`. ''' self.model.scale_p(a) return self
[docs] def filter_x(self, xx): ''' Returns new :py:class:`~.models.bin_model.BinModel` ''' bins = [] for x in xx: bins.append(self.xx.index(x) + 1) model = self.model.filter_x(bins) return BinModel(xx, model, scaled=self.model.scaled)
[docs] def interpolate_x(self, newxx): ''' Returns a copy of the model. *Does not interpolate.* ''' return self.copy()
[docs] def write_file(self, filename, delimiter): ''' Write model as delimited document to filepath Wrapper around :py:meth:`~.models.bin_model.BinModel.write` method. Parameters ---------- filename : str Path to file to be written delimiter : str Delimiter to use in file (e.g. '\t', ',') ''' with open(filename, 'w') as fp: self.write(fp, delimiter)
[docs] def write(self, file, delimiter): ''' Write model as delimited document to file-like object Prepends model type (``bin1``) and bin borders (:py:attr:`~.models.bin_model.BinModel.xx`) to document written by ``self.model.write``. Parameters ---------- file : object file-like object delimiter : str Delimiter to use in file (e.g. '\t', ',') ''' file.write("bin1\n") file.write(delimiter.join(map(str, self.xx)) + "\n") self.model.write(file, delimiter)
[docs] def get_bin_at(self, x): ''' Returns bin containing value *x* Parameters ---------- x : numeric Value to search for in binned axis Returns ------- int Returns index of bin containing *x*. If bin is not contained in the bin range, returns ``-1``. ''' for ii in range(len(self.xx)-1): if self.xx[ii] <= x and self.xx[ii+1] > x: return ii return -1
[docs] def to_points_at(self, x, ys): return self.model.to_points_at(self.get_bin_at(x), ys)
[docs] def eval_pval(self, x, p, threshold=1e-3): return self.model.eval_pval(self.get_bin_at(x), p, threshold)
[docs] def cdf(self, x, y): return self.model.cdf(self.get_bin_at(x), y)
[docs] def get_mean(self, x=None, index=None): if index is None: index = self.get_bin_at(x) if index == -1: return np.nan return self.model.get_mean(self.model.get_xx()[index])
[docs] def get_sdev(self, x=None, index=None): if index is None: index = self.get_bin_at(x) return self.model.get_sdev(index)
[docs] def draw_sample(self, x=None): return self.model.draw_sample(self.get_bin_at(x))
[docs] def init_from_bin_file(self, file, delimiter, status_callback=None, init_submodel=lambda fp: None): line = string.strip(file.readline()) if line != "bin1": raise ValueError("Unknown format: %s" % (line)) reader = csv.reader(file, delimiter=delimiter) row = reader.next() self.xx_text = row self.xx = map(float, row) self.xx_is_categorical = False self.model = init_submodel(file) # Need to set this! if self.model is not None: self.scaled = self.model.scaled return self
[docs] def to_ddp(self, ys=None): newcats = [] newxx = [self.xx[0]] for ii in range(1, len(self.xx)): newcats.extend([ii, ii]) diff = (self.xx[ii] - self.xx[ii-1]) / 100.0 newxx.extend([self.xx[ii] - diff, self.xx[ii] + diff]) newxx[-1] = self.xx[-1] dupmodel = self.model.recategorize_x(newcats, range(1, len(newcats)+1)) dupmodel = dupmodel.to_ddp(ys) dupmodel.xx = newxx dupmodel.xx_text = map(str, newxx) dupmodel.xx_is_categorical = False return dupmodel
### Memoizable
[docs] def get_edges(self): ''' Returns bin edges (duplicate of :py:meth:`~.models.bin_model.BinModel.get_xx`) ''' return self.xx
[docs] def eval_pval_index(self, ii, p, threshold=1e-3): return self.model.eval_pval_index(ii, p, threshold)
### Class Methods
[docs] @staticmethod def consistent_bins(models): ''' All models are BinModels ''' allxx = set() for model in models: if not model.scaled: raise ValueError("Only scaled distributions can be merged.") allxx.update(set(model.xx)) allxx = np.array(sorted(allxx)) midpts = (allxx[1:] + allxx[:-1]) / 2 midpts[midpts == -np.inf] = min(allxx[allxx > -np.inf]) - 10. midpts[midpts == np.inf] = max(allxx[allxx < np.inf]) + 10. newmodels = [] for model in models: allbins = [model.get_bin_at(x) for x in midpts] allxxs = [model.model.get_xx()[bin] if bin >= 0 else np.nan for bin in allbins] newmodel = model.model.recategorize_x(allxxs, range(0, len(allxx)-1)) newmodels.append(BinModel(allxx, newmodel)) return newmodels
[docs] @staticmethod def merge(models): ''' All models are BinModels ''' newmodels = BinModel.consistent_bins(models) allmodel = Model.merge(map(lambda m: m.model, newmodels)) model = BinModel(newmodels[0].get_xx(), allmodel) return model
[docs] @staticmethod def combine(one, two): ''' Both models are BinModels ''' if not one.scaled or not two.scaled: raise ValueError("Cannot combine unscaled models") (one, two, xx) = UnivariateModel.intersect_x(one, two) allxx = set(one.xx) | set(two.xx) allxx = np.array(allxx) midpts = (allxx[1:] + allxx[:-1]) / 2 onemodel = one.model.recategorize_x(map(model.get_bin_at, midpts), range(1, len(allxx))) twomodel = two.model.recategorize_x(map(model.get_bin_at, midpts), range(1, len(allxx))) model = Model.combine([onemodel, twomodel], [1, 1]) return BinModel(allxx, model, True)
from ddp_model import DDPModel Model.mergers["bin_model"] = BinModel.merge Model.mergers["bin_model+ddp_model"] = lambda models: DDPModel.merge(map(lambda m: m.to_ddp(), models)) Model.mergers["bin_model+spline_model"] = lambda models: DDPModel.merge(map(lambda m: m.to_ddp(), models)) Model.combiners['bin_model+bin_model'] = BinModel.combine Model.combiners["bin_model+ddp_model"] = lambda one, two: DDPModel.combine(one.to_ddp(), two) Model.combiners["bin_model+spline_model"] = lambda one, two: DDPModel.combine(one.to_ddp(), two)