Source code for segmentation
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from sklearn.cluster import AgglomerativeClustering
from tsam.representations import representations
[docs]def segmentation(
normalizedTypicalPeriods,
noSegments,
timeStepsPerPeriod,
representationMethod=None,
representationDict=None,
distributionPeriodWise=True,
):
"""
Agglomerative clustering of adjacent time steps within a set of typical periods in order to further reduce the
temporal resolution within typical periods and to further reduce complexity of input data.
:param normalizedTypicalPeriods: MultiIndex DataFrame containing the typical periods as first index, the time steps
within the periods as second index and the attributes as columns.
:type normalizedTypicalPeriods: pandas DataFrame
:param noSegments: Number of segments in which the typical periods should be subdivided - equivalent to the number of
inner-period clusters.
:type noSegments: integer
:param timeStepsPerPeriod: Number of time steps per period
:type timeStepsPerPeriod: integer
:returns: - **segmentedNormalizedTypicalPeriods** (pandas DataFrame) -- MultiIndex DataFrame similar to
normalizedTypicalPeriods but with segments instead of time steps. Moreover, two additional index
levels define the length of each segment and the time step index at which each segment starts.
- **predictedSegmentedNormalizedTypicalPeriods** (pandas DataFrame) -- MultiIndex DataFrame with the same
shape of normalizedTypicalPeriods, but with overwritten values derived from segmentation used for
prediction of the original periods and accuracy indicators.
"""
# Initialize lists for predicted and segmented DataFrame
segmentedNormalizedTypicalPeriodsList = []
predictedSegmentedNormalizedTypicalPeriodsList = []
# do for each typical period
for i in normalizedTypicalPeriods.index.get_level_values(0).unique():
# make numpy array with rows containing the segmenatation candidates (time steps)
# and columns as dimensions of the
segmentationCandidates = np.asarray(normalizedTypicalPeriods.loc[i, :])
# produce adjacency matrix: Each time step is only connected to its preceding and succeeding one
adjacencyMatrix = np.eye(timeStepsPerPeriod, k=1) + np.eye(
timeStepsPerPeriod, k=-1
)
# execute clustering of adjacent time steps
if noSegments == 1:
clusterOrder = np.asarray([0] * len(segmentationCandidates))
else:
clustering = AgglomerativeClustering(
n_clusters=noSegments, linkage="ward", connectivity=adjacencyMatrix
)
clusterOrder = clustering.fit_predict(segmentationCandidates)
# determine the indices where the segments change and the number of time steps in each segment
segNo, indices, segmentNoOccur = np.unique(
clusterOrder, return_index=True, return_counts=True
)
clusterOrderUnique = [clusterOrder[index] for index in sorted(indices)]
# determine the segments' values
clusterCenters, clusterCenterIndices = representations(
segmentationCandidates,
clusterOrder,
default="meanRepresentation",
representationMethod=representationMethod,
representationDict=representationDict,
distributionPeriodWise=distributionPeriodWise,
timeStepsPerPeriod=1,
)
# clusterCenters = meanRepresentation(segmentationCandidates, clusterOrder)
# predict each time step of the period by representing it with the corresponding segment's values
predictedSegmentedNormalizedTypicalPeriods = (
pd.DataFrame(clusterCenters, columns=normalizedTypicalPeriods.columns)
.reindex(clusterOrder)
.reset_index(drop=True)
)
# represent the period by the segments in the right order only instead of each time step
segmentedNormalizedTypicalPeriods = (
pd.DataFrame(clusterCenters, columns=normalizedTypicalPeriods.columns)
.reindex(clusterOrderUnique)
.set_index(np.sort(indices))
)
# keep additional information on the lengths of the segments in the right order
segmentDuration = (
pd.DataFrame(segmentNoOccur, columns=["Segment Duration"])
.reindex(clusterOrderUnique)
.set_index(np.sort(indices))
)
# create DataFrame with reduced number of segments together with three indices per period:
# 1. The segment number
# 2. The segment duration
# 3. The index of the original time step, at which the segment starts
result = segmentedNormalizedTypicalPeriods.set_index(
[
pd.Index(segNo, name="Segment Step"),
segmentDuration["Segment Duration"],
pd.Index(np.sort(indices), name="Original Start Step"),
]
)
# append predicted and segmented DataFrame to list to create a big DataFrame for all periods
predictedSegmentedNormalizedTypicalPeriodsList.append(
predictedSegmentedNormalizedTypicalPeriods
)
segmentedNormalizedTypicalPeriodsList.append(result)
# create a big DataFrame for all periods for predicted segmented time steps and segments and return
predictedSegmentedNormalizedTypicalPeriods = pd.concat(
predictedSegmentedNormalizedTypicalPeriodsList,
keys=normalizedTypicalPeriods.index.get_level_values(0).unique(),
).rename_axis(["", "TimeStep"])
segmentedNormalizedTypicalPeriods = pd.concat(
segmentedNormalizedTypicalPeriodsList,
keys=normalizedTypicalPeriods.index.get_level_values(0).unique(),
)
return segmentedNormalizedTypicalPeriods, predictedSegmentedNormalizedTypicalPeriods