import numpy as np
import juliatools, latextools, calculation, diagnostic, arguments, formatting
from formatting import FormatElement
"""Scale the results by the value in scale_dict, or the mean value (if it is set).
make_generator: we encapsulate this function, passing in data and opporting on outputs
func: default operation is to multiple (scale), but can do other things (e.g., - for re-basing)
"""
[docs]class Scale(calculation.Calculation):
def __init__(self, subcalc, scale_dict, from_units, to_units, func=lambda x, y: x*y, latexpair=(r"\bar{I}", "Region-specific scaling")):
super(Scale, self).__init__([to_units] + subcalc.unitses)
assert(subcalc.unitses[0] == from_units)
self.subcalc = subcalc
self.scale_dict = scale_dict
self.func = func
self.latexpair = latexpair
self.from_units = from_units
[docs] def apply(self, region, *args, **kwargs):
def generate(year, result):
if region in self.scale_dict:
return self.func(result, self.scale_dict[region])
else:
return self.func(result, self.scale_dict['mean'])
# Prepare the generator from our encapsulated operations
subapp = self.subcalc.apply(region, *args, **kwargs)
return calculation.ApplicationPassCall(region, subapp, generate, unshift=True)
[docs] def column_info(self):
infos = self.subcalc.column_info()
title = 'Scaled ' + infos[0]['title']
equation = latextools.english_function(self.func, infos[0]['name'], self.latexpair[1])
description = "Computed from the %s variable, as %s." % (infos[0]['name'], equation)
return [dict(name='scaled', title=title, description=description)] + infos
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation, arguments.region_dictionary, arguments.input_unit,
arguments.output_unit, arguments.input_reduce.optional(),
arguments.latexpair],
description="Scale each result by a region-specific value.")
"""
Transform all results by a function.
"""
[docs]class Instabase(calculation.CustomFunctionalCalculation):
"""Re-base the results of make_generator(...) to the values in baseyear
baseyear is the year to use as the 'denominator'; None for the first year
Default func constructs a porportional change; x - y makes simple difference.
skip_on_missing: If we never encounter the year and this is false,
still print out the existing results.
Tacks on the value to the front of the results
"""
def __init__(self, subcalc, baseyear, func=lambda x, y: x / y, units='portion', skip_on_missing=True):
super(Instabase, self).__init__(subcalc, subcalc.unitses[0], units, True, baseyear, func, skip_on_missing)
self.baseyear = baseyear
self.denom = None # The value in the baseyear
self.pastresults = [] # results before baseyear
[docs] def init_apply(self):
self.pastresults = [] # don't copy this across instances!
[docs] def pushhandler(self, ds, baseyear, func, skip_on_missing):
"""
Returns an interator of (yyyy, value, ...).
"""
for yearresult in self.subapp.push(ds):
year = yearresult[0]
result = yearresult[1]
# Should we base everything off this year?
if year == baseyear or (baseyear is None and self.denom is None):
self.denom = result
# Print out all past results, relative to this year
for self.pastresult in self.pastresults:
yield [self.pastresult[0], func(self.pastresult[1], self.denom)] + list(self.pastresult[1:])
if self.denom is None:
# Keep track of this until we have a base
self.pastresults.append(yearresult)
else:
# calculate this and tack it on
yield [year, func(result, self.denom)] + list(yearresult[1:])
[docs] def donehandler(self, baseyear, func, skip_on_missing):
if self.denom is None and skip_on_missing:
# Never got to this year: just write out results
for pastresult in self.pastresults:
yield pastresult
[docs] def column_info(self):
infos = self.subcalc.column_info()
title = 'Rebased ' + infos[0]['title']
description = "The result calculated relative to the year %d, by re-basing variable %s." % (self.baseyear, infos[0]['name'])
return [dict(name='rebased', title=title, description=description)] + infos
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation, arguments.year,
arguments.input_reduce.optional(), arguments.output_unit.optional(),
arguments.skip_on_missing.optional()],
description="Translate all results relative to the baseline year.")
[docs]class SpanInstabase(Instabase):
"""Re-base the results of a calculation to the average of values between two years.
Default func constructs a porportional change; x - y makes simple difference.
skip_on_missing: If we never encounter the year and this is false,
still print out the existing results.
"""
def __init__(self, subcalc, year1, year2, func=lambda x, y: x / y, units='portion', skip_on_missing=True):
super(SpanInstabase, self).__init__(subcalc, (year1 + year2) / 2, func, units, skip_on_missing)
self.year1 = year1
self.year2 = year2
self.denomterms = []
[docs] def init_apply(self):
self.denomterms = [] # don't copy this across instances!
self.pastresults = []
[docs] def pushhandler(self, ds, baseyear, func, skip_on_missing):
"""
Returns an interator of (yyyy, value, ...).
"""
for yearresult in self.subapp.push(ds):
year = yearresult[0]
result = yearresult[1]
# Should we base everything off this year?
if year == self.year2:
self.denomterms.append(result)
if not self.deltamethod:
self.denom = np.mean(self.denomterms)
else:
self.denom = np.mean(self.denomterms, 0)
# Print out all past results, re-based
for pastresult in self.pastresults:
diagnostic.record(self.region, pastresult[0], 'baseline', self.denom)
yield [pastresult[0], func(pastresult[1], self.denom)] + list(pastresult[1:])
if self.denom is None:
# Keep track of this until we have a base
self.pastresults.append(yearresult)
if year >= self.year1:
self.denomterms.append(result)
else:
diagnostic.record(self.region, year, 'baseline', self.denom)
# calculate this and tack it on
yield [year, func(result, self.denom)] + list(yearresult[1:])
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation, arguments.year.describe("The starting year"),
arguments.year.describe("The ending year"),
arguments.input_reduce.optional(), arguments.output_unit.optional(),
arguments.skip_on_missing.optional()],
description="Translate all results relative to a span of baseline years.")
[docs]class InstaZScore(calculation.CustomFunctionalCalculation):
"""
Collects up to `baseyear` of values and then uses them to represent all values as a z-score.
"""
def __init__(self, subcalc, lastyear, units='z-score'):
super(InstaZScore, self).__init__(subcalc, subcalc.unitses[0], units, True, lastyear)
self.lastyear = lastyear
self.mean = None # The mean to subtract off
self.sdev = None # The sdev to divide by
self.pastresults = [] # results before lastyear
[docs] def init_apply(self):
self.pastresults = [] # don't copy this across instances!
[docs] def pushhandler(self, ds, lastyear):
"""
Returns an interator of (yyyy, value, ...).
"""
for yearresult in self.subapp.push(ds):
year = yearresult[0]
result = yearresult[1]
# Have we collected all the data?
if year == lastyear or (lastyear is None and self.mean is None):
self.mean = np.mean(map(lambda mx: mx[1], self.pastresults))
self.sdev = np.std(map(lambda mx: mx[1], self.pastresults))
# Print out all past results, now that we have them
for pastresult in self.pastresults:
yield [pastresult[0], (pastresult[1] - self.mean) / self.sdev] + list(pastresult[1:])
if self.mean is None:
# Keep track of this until we have a base
self.pastresults.append(yearresult)
else:
# calculate this and tack it on
yield [year, (result - self.mean) / self.sdev] + list(yearresult[1:])
[docs] def column_info(self):
infos = self.subcalc.column_info()
title = 'Z-Score of ' + infos[0]['title']
description = "Z-scores of %s calculated relative to the years before %d." % (infos[0]['name'], self.lastyear)
return [dict(name='zscore', title=title, description=description)] + infos
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation, arguments.year,
arguments.output_unit.optional()],
description="Translate all results to z-scores against results up to a given year.")
"""
Sum two or more results
"""
[docs]class Sum(calculation.Calculation):
def __init__(self, subcalcs):
fullunitses = subcalcs[0].unitses[:]
for ii in range(1, len(subcalcs)):
assert subcalcs[0].unitses[0] == subcalcs[ii].unitses[0], "%s <> %s" % (subcalcs[0].unitses[0], subcalcs[ii].unitses[0])
fullunitses.extend(subcalcs[ii].unitses)
super(Sum, self).__init__([subcalcs[0].unitses[0]] + fullunitses)
self.subcalcs = subcalcs
[docs] def apply(self, region, *args, **kwargs):
def generate(year, results):
if not self.deltamethod:
return np.sum(map(lambda x: x[1] if x is not None else np.nan, results))
else:
return np.sum(map(lambda x: x[1] if x is not None else np.nan, results), 0)
# Prepare the generator from our encapsulated operations
subapps = [subcalc.apply(region, *args, **kwargs) for subcalc in self.subcalcs]
return calculation.ApplicationPassCall(region, subapps, generate, unshift=True)
[docs] def column_info(self):
infoses = [subcalc.column_info() for subcalc in self.subcalcs]
title = 'Sum of previous results'
description = 'Sum of ' + ', '.join([infos[0]['title'] for infos in infoses])
fullinfos = []
for infos in infoses:
fullinfos.extend(infos)
return [dict(name='sum', title=title, description=description)] + fullinfos
[docs] def enable_deltamethod(self):
self.deltamethod = True
for subcalc in self.subcalcs:
subcalc.enable_deltamethod()
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculationss],
description="Sum the results of multiple previous calculations.")
"""
ConstantScale
"""
[docs]class ConstantScale(calculation.Calculation):
def __init__(self, subcalc, coeff):
super(ConstantScale, self).__init__([subcalc.unitses[0]] + subcalc.unitses)
self.subcalc = subcalc
self.coeff = coeff
[docs] def apply(self, region, *args, **kwargs):
def generate(year, result):
return self.coeff * result
# Prepare the generator from our encapsulated operations
subapp = self.subcalc.apply(region, *args, **kwargs)
return calculation.ApplicationPassCall(region, subapp, generate, unshift=True)
[docs] def column_info(self):
infos = self.subcalc.column_info()
title = 'Previous result multiplied by %f' % self.coeff
description = 'Previous result multiplied by %f' % self.coeff
return [dict(name='constscale', title=title, description=description)] + infos
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation, arguments.coefficient],
description="Multiply the result by a constant factor.")
[docs]class Positive(calculation.Calculation):
"""
Return 0 if subcalc is less than 0
"""
def __init__(self, subcalc):
super(Positive, self).__init__([subcalc.unitses[0]] + subcalc.unitses)
self.subcalc = subcalc
[docs] def apply(self, region, *args, **kwargs):
def generate(year, result):
return result if result > 0 else 0
# Prepare the generator from our encapsulated operations
subapp = self.subcalc.apply(region, *args, **kwargs)
return calculation.ApplicationPassCall(region, subapp, generate, unshift=True)
[docs] def column_info(self):
infos = self.subcalc.column_info()
title = 'Positive-only form of ' + infos[0]['title']
description = 'The value of ' + infos[0]['title'] + ', if positive and otherwise 0.'
return [dict(name='positive', title=title, description=description)] + infos
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation],
description="Return the maximum of a previous result or 0.")
[docs]class Exponentiate(calculation.Calculation):
def __init__(self, subcalc):
assert subcalc.unitses[0][:3] == 'log'
super(Exponentiate, self).__init__([subcalc.unitses[0][3:].strip()] + subcalc.unitses)
self.subcalc = subcalc
[docs] def apply(self, region, *args, **kwargs):
def generate(year, result):
return np.exp(result)
# Prepare the generator from our encapsulated operations
subapp = self.subcalc.apply(region, *args, **kwargs)
return calculation.ApplicationPassCall(region, subapp, generate, unshift=True)
[docs] def column_info(self):
infos = self.subcalc.column_info()
title = 'exp(' + infos[0]['title'] + ')'
description = 'Exponentiation of ' + infos[0]['title']
return [dict(name='exp', title=title, description=description)] + infos
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation],
description="Return the the exponentiation of a previous result.")
[docs]class AuxillaryResult(calculation.Calculation):
"""
Produce an additional output, but then pass the main result on.
"""
def __init__(self, subcalc_main, subcalc_aux, auxname):
super(AuxillaryResult, self).__init__([subcalc_main.unitses[0], subcalc_aux.unitses[0]] + subcalc_main.unitses[1:])
self.subcalc_main = subcalc_main
self.subcalc_aux = subcalc_aux
self.auxname = auxname
[docs] def apply(self, region, *args, **kwargs):
subapp_main = self.subcalc_main.apply(region, *args, **kwargs)
subapp_aux = self.subcalc_aux.apply(region, *args, **kwargs)
return AuxillaryResultApplication(region, subapp_main, subapp_aux)
[docs] def column_info(self):
infos_main = self.subcalc_main.column_info()
infos_aux = self.subcalc_aux.column_info()
infos_aux[0]['name'] = self.auxname
return [infos_main[0]] + infos_aux + infos_main[1:]
[docs] @staticmethod
def describe():
return dict(input_timerate='any', output_timerate='same',
arguments=[arguments.calculation, arguments.calculation.describe("An auxillary calculation, placed behind the main calculation.")],
description="Add an additional result to the columns.")
[docs]class AuxillaryResultApplication(calculation.Application):
"""
Perform both main and auxillary calculation, and order as main[0], aux, main[1:]
"""
def __init__(self, region, subapp_main, subapp_aux):
super(AuxillaryResultApplication, self).__init__(region)
self.subapp_main = subapp_main
self.subapp_aux = subapp_aux
[docs] def push(self, ds):
for yearresult in self.subapp_main.push(ds):
for yearresult_aux in self.subapp_aux.push(ds):
next # Just take the last one
yield list(yearresult[0:2]) + [yearresult_aux[1]] + list(yearresult[2:])
[docs] def done(self):
self.subapp_main.done()
self.subapp_aux.done()