Source code for hyperparametertuning

# -*- coding: utf-8 -*-

import copy

import numpy as np

import tqdm

from tsam.timeseriesaggregation import TimeSeriesAggregation
        
def getNoPeriodsForDataReduction(noRawTimeSteps, segmentsPerPeriod, dataReduction):
    """        
    Identifies the maximum number of periods which can be set to achieve the required data reduction.

    :param noRawTimeSteps: Number of original time steps. required
    :type noRawTimeSteps: int

    :param segmentsPerPeriod: Segments per period. required
    :type segmentsPerPeriod: int

    :param dataReduction: Factor by which the resulting dataset should be reduced. required
    :type dataReduction: float

    :returns: **noTypicalPeriods** --  Number of typical periods that can be set.
    """
    return int(np.floor(dataReduction * float(noRawTimeSteps)/segmentsPerPeriod))

def getNoSegmentsForDataReduction(noRawTimeSteps, typicalPeriods, dataReduction):
    """        
    Identifies the maximum number of segments which can be set to achieve the required data reduction.

    :param noRawTimeSteps: Number of original time steps. required
    :type noRawTimeSteps: int

    :param typicalPeriods: Number of typical periods. required
    :type typicalPeriods: int

    :param dataReduction: Factor by which the resulting dataset should be reduced. required
    :type dataReduction: float

    :returns: **segmentsPerPeriod** --  Number of segments per period that can be set.
    """
    return int(np.floor(dataReduction * float(noRawTimeSteps)/typicalPeriods))




[docs]class HyperTunedAggregations(object):
[docs] def __init__(self, base_aggregation, saveAggregationHistory=True): """ A class that does a parameter variation and tuning of the aggregation itself. :param base_aggregation: TimeSeriesAggregation object which is used as basis for tuning the hyper parameters. required :type base_aggregation: TimeSeriesAggregation :param saveAggregationHistory: Defines if all aggregations that are created during the tuning and iterations shall be saved under self.aggregationHistory. :type saveAggregationHistory: boolean """ self.base_aggregation = base_aggregation if not isinstance(self.base_aggregation, TimeSeriesAggregation): raise ValueError( "base_aggregation has to be an TimeSeriesAggregation object" ) self._alterableAggregation=copy.deepcopy(self.base_aggregation) self.saveAggregationHistory=saveAggregationHistory self._segmentHistory=[] self._periodHistory=[] self._RMSEHistory=[] if self.saveAggregationHistory: self.aggregationHistory=[]
def _testAggregation(self, noTypicalPeriods, noSegments): """ Tests the aggregation for a set of typical periods and segments and returns the RMSE """ self._segmentHistory.append(noSegments) self._periodHistory.append(noTypicalPeriods) self._alterableAggregation.noTypicalPeriods=noTypicalPeriods self._alterableAggregation.noSegments=noSegments self._alterableAggregation.createTypicalPeriods() self._alterableAggregation.predictOriginalData() RMSE=self._alterableAggregation.totalAccuracyIndicators()["RMSE"] self._RMSEHistory.append(RMSE) if self.saveAggregationHistory: self.aggregationHistory.append(copy.copy(self._alterableAggregation)) return RMSE def _deleteTestHistory(self, index): """ Delelets the defined index from the test history """ del self._segmentHistory[index] del self._periodHistory[index] del self._RMSEHistory[index] if self.saveAggregationHistory: del self.aggregationHistory[index]
[docs] def identifyOptimalSegmentPeriodCombination(self, dataReduction): """ Identifies the optimal combination of number of typical periods and number of segments for a given data reduction set. :param dataReduction: Factor by which the resulting dataset should be reduced. required :type dataReduction: float :returns: **noSegments, noTypicalperiods** -- The optimal combination of segments and typical periods for the given optimization set. """ if not self.base_aggregation.segmentation: raise ValueError("This function does only make sense in combination with 'segmentation' activated.") noRawTimeSteps=len(self.base_aggregation.timeSeries.index) _maxPeriods = int(float(noRawTimeSteps)/self.base_aggregation.timeStepsPerPeriod) _maxSegments = self.base_aggregation.timeStepsPerPeriod # save RMSE RMSE_history = [] # correct 0 index of python possibleSegments = np.arange(_maxSegments)+1 possiblePeriods = np.arange(_maxPeriods)+1 # number of time steps of all combinations of segments and periods combinedTimeSteps = np.outer(possibleSegments, possiblePeriods) # reduce to valid combinations for targeted data reduction reductionValidCombinations = combinedTimeSteps <= noRawTimeSteps * dataReduction # number of time steps for all feasible combinations reductionValidTimsteps = combinedTimeSteps * reductionValidCombinations # identify max segments and max period combination optimalPeriods = np.zeros_like(reductionValidTimsteps) optimalPeriods[np.arange(reductionValidTimsteps.shape[0]), reductionValidTimsteps.argmax(axis=1)] = 1 optimalSegments = np.zeros_like(reductionValidTimsteps) optimalSegments[reductionValidTimsteps.argmax(axis=0), np.arange(reductionValidTimsteps.shape[1])] = 1 optimalIndexCombo = np.nonzero(optimalPeriods*optimalSegments) for segmentIx, periodIx in tqdm.tqdm(zip(optimalIndexCombo[0],optimalIndexCombo[1])): # derive new typical periods and derive rmse RMSE_history.append(self._testAggregation(possiblePeriods[periodIx], possibleSegments[segmentIx])) # take the negative backwards index with the minimal RMSE min_index = - list(reversed(RMSE_history)).index(min(RMSE_history)) - 1 RMSE_min = RMSE_history[min_index] noTypicalPeriods=self._periodHistory[min_index] noSegments=self._segmentHistory[min_index] # and return the segment and typical period pair return noSegments, noTypicalPeriods, RMSE_min
[docs] def identifyParetoOptimalAggregation(self, untilTotalTimeSteps=None): """ Identifies the pareto-optimal combination of number of typical periods and number of segments along with a steepest decent approach, starting from the aggregation to a single period and a single segment up to the representation of the full time series. :param untilTotalTimeSteps: Number of timesteps until which the pareto-front should be determined. If None, the maximum number of timesteps is chosen. :type untilTotalTimeSteps: int :returns: **** -- Nothing. Check aggregation history for results. All typical Periods in scaled form. """ if not self.base_aggregation.segmentation: raise ValueError("This function does only make sense in combination with 'segmentation' activated.") noRawTimeSteps=len(self.base_aggregation.timeSeries.index) _maxPeriods = int(float(noRawTimeSteps)/self.base_aggregation.timeStepsPerPeriod) _maxSegments = self.base_aggregation.timeStepsPerPeriod if untilTotalTimeSteps is None: untilTotalTimeSteps=noRawTimeSteps progressBar = tqdm.tqdm(total=untilTotalTimeSteps) # starting point noTypicalPeriods=1 noSegments=1 _RMSE_0=self._testAggregation(noTypicalPeriods, noSegments) # loop until either segments or periods have reached their maximum while (noTypicalPeriods<_maxPeriods and noSegments<_maxSegments and (noSegments+1)*noTypicalPeriods<=untilTotalTimeSteps and noSegments*(noTypicalPeriods+1)<=untilTotalTimeSteps): # test for more segments RMSE_segments = self._testAggregation(noTypicalPeriods, noSegments+1) # test for more periods RMSE_periods = self._testAggregation(noTypicalPeriods+1, noSegments) # RMSE old RMSE_old = self._RMSEHistory[-3] # segment gradient (RMSE improvement per increased time step number) # for segments: for each period on segment added RMSE_segment_gradient = (RMSE_old - RMSE_segments) / noTypicalPeriods # for periods: one period with no of segments RMSE_periods_gradient = (RMSE_old - RMSE_periods) / noSegments # go along the steeper gradient if RMSE_periods_gradient>RMSE_segment_gradient: noTypicalPeriods+=1 # and delete the search direction which was not persued self._deleteTestHistory(-2) else: noSegments+=1 self._deleteTestHistory(-1) progressBar.update(noSegments*noTypicalPeriods-progressBar.n) # afterwards loop over periods and segments exclusively until maximum is reached while noTypicalPeriods<_maxPeriods and noSegments*(noTypicalPeriods+1)<=untilTotalTimeSteps: noTypicalPeriods+=1 RMSE = self._testAggregation(noTypicalPeriods, noSegments) progressBar.update(noSegments*noTypicalPeriods-progressBar.n) while noSegments<_maxSegments and (noSegments+1)*noTypicalPeriods<=untilTotalTimeSteps: noSegments+=1 RMSE = self._testAggregation(noTypicalPeriods, noSegments) progressBar.update(noSegments*noTypicalPeriods-progressBar.n) return