Source code for openest.generate.shortterm

import copy
import numpy as np
from calculation import Calculation, Application, ApplicationEach
from curvegen import CurveGenerator

[docs]class SingleWeatherApply(Calculation): def __init__(self, units, curve, curve_description, weather_change=lambda x: x): super(SingleWeatherApply, self).__init__([units]) if isinstance(curve, CurveGenerator): assert curve.depenunit == units self.curve = curve self.curve_description = curve_description self.weather_change = weather_change
[docs] def apply(self, region, *args): if isinstance(self.curve, CurveGenerator): curve = self.curve.get_curve(region, *args) else: curve = self.curve def generate(region, time, weather, **kw): weather = self.weather_change(weather) result = curve(weather) if not np.isnan(result): yield time, result return ApplicationEach(region, generate)
[docs] def column_info(self): description = "Single value applied to " + self.curve_description return [dict(name='response', title='Direct marginal response', description=description)]
[docs] @staticmethod def describe(): return dict(input_timerate='any', output_timerate='same', arguments=[arguments.output_unit, arguments.curve_or_curvegen, arguments.curve_description, arguments.input_change], description="Apply a curve to each value individually and report it.")
[docs]class MonthlyClimateApply(Calculation): def __init__(self, units, curve, curve_description, monthmeans, regions, weather_change=lambda x: x): super(MonthlyClimateApply, self).__init__([units]) if isinstance(curve, CurveGenerator): assert curve.depenunit == units self.curve = curve self.curve_description = curve_description self.monthmeans = monthmeans self.regions = regions self.weather_change = weather_change
[docs] def apply(self, region, *args): if isinstance(self.curve, CurveGenerator): curve = self.curve.get_curve(region, *args) else: curve = self.curve region_index = self.regions.index(region) def generate(region, time, weather, **kw): # Ignores weather weather = self.weather_change(self.monthmeans[int(time) % 12][region_index]) result = curve(weather) if not np.isnan(result): yield time, result return ApplicationEach(region, generate)
[docs] def column_info(self): description = "Single monthly value from climate applied to " + self.curve_description return [dict(name='climatic', title='Direct climatic response', description=description)]
[docs] @staticmethod def describe(): return dict(input_timerate='any', output_timerate='same', arguments=[arguments.output_unit, arguments.curve_or_curvegen, arguments.curve_description, arguments.monthvalues, arguments.regions, arguments.input_change], description="Apply a curve to each region's climatological monthly value.")
[docs]class InstaZScoreApply(Calculation, Application): def __init__(self, units, curve, curve_description, lasttime, weather_change=lambda x: x): super(InstaZScoreApply, self).__init__([units]) if isinstance(curve, CurveGenerator): assert curve.depenunit == units self.curve = curve self.curve_description = curve_description self.lasttime = lasttime self.weather_change = weather_change self.mean = None # The mean to subtract off self.sdev = None # The sdev to divide by self.pastresults = [] # results before lasttime
[docs] def apply(self, region, *args, **kwargs): app = copy.copy(self) if isinstance(self.curve, CurveGenerator): app.curve = self.curve.get_curve(region, *args) else: app.curve = self.curve app.pastweathers = [] # don't copy this across instances! return app
[docs] def push(self, time, weather): time = time[0] weather = self.weather_change(weather[0]) # Have we collected all the data? if time == self.lasttime or (self.lasttime is None and self.mean is None): self.mean = np.mean(map(lambda x: x[1], self.pastweathers)) self.sdev = np.std(map(lambda x: x[1], self.pastweathers)) # Print out all past weathers, now that we have them for pastweather in self.pastweathers: yield pastweather[0], self.curve((pastweather[1] - self.mean) / self.sdev) if self.mean is None: # Keep track of this until we have a base self.pastweathers.append((time, weather)) else: # calculate this and tack it on yield [time, self.curve((weather - self.mean) / self.sdev)]
[docs] def column_info(self): description = "Single value applied to " + self.curve_description return [dict(name='response', title='Direct marginal response', description=description)]
[docs] @staticmethod def describe(): return dict(input_timerate='any', output_timerate='same', arguments=[arguments.output_unit, arguments.curve_or_curvegen, arguments.curve_description, arguments.time, arguments.input_change], description="Apply a curve to the z-score of inputs, based on values up to a given time.")
[docs]class MonthlyZScoreApply(Calculation, Application): def __init__(self, units, curve, curve_description, monthmeans, monthsdevs, regions, weather_change=lambda x: x): super(MonthlyZScoreApply, self).__init__([units]) if isinstance(curve, CurveGenerator): assert curve.depenunit == units self.curve = curve self.curve_description = curve_description self.monthmeans = monthmeans self.monthsdevs = monthsdevs self.regions = regions self.weather_change = weather_change
[docs] def apply(self, region, *args, **kwargs): app = copy.copy(self) if isinstance(self.curve, CurveGenerator): app.curve = self.curve.get_curve(region, *args) else: app.curve = self.curve app.region_index = self.regions.index(region) return app
[docs] def push(self, time, weather): time = time[0] weather = self.weather_change(weather[0]) yield (time, self.curve((weather - self.monthmeans[time % 12][self.region_index]) / self.monthsdevs[time % 12][self.region_index]))
[docs] def column_info(self): description = "Single value as z-score applied to " + self.curve_description return [dict(name='response', title='Direct marginal response', description=description)]
[docs] @staticmethod def describe(): return dict(input_timerate='month', output_timerate='month', arguments=[arguments.output_unit, arguments.curve_or_curvegen, arguments.curve_description, arguments.monthvalues.describe("Monthly means."), arguments.monthvalues.describe("Monthly standard deviations."), arguments.regions, arguments.input_change], description="Apply a curve to the z-score of inputs, based on monthly means and standard deviations.")
[docs]class SplitByMonth(Calculation): def __init__(self, subcalc): super(SplitByMonth, self).__init__([subcalc.unitses[0]]) self.subcalc = subcalc
[docs] def apply(self, region, *args, **kwargs): bymonth = [self.subcalc.apply(region + '-' + str(month), *args, **kwargs) for month in range(1, 13)] timeminmax = [np.inf, -np.inf] valuebytime = {} def generate(region, time, weather, *args, **kwargs): timeminmax[0] = min(time, timeminmax[0]) timeminmax[1] = max(time, timeminmax[1]) for yearresult in bymonth[time % 12].push([time], [weather]): valuebytime[yearresult[0]] = yearresult[1:] return [] def finish(): for time in range(mintime, maxtime + 1): yield [time] + valuebytime.get(time, [np.nan]) return ApplicationEach(region, generate, finish)
[docs] def column_info(self): description = "Separately applied by month" return [dict(name='split', title='Month split', description=description)]
[docs] @staticmethod def describe(): return dict(input_timerate='month', output_timerate='month', arguments=[arguments.calculation], description="Apply the previous calculation separately for each month.")