Skip to content

tsam.tuning

tsam.tuning

Hyperparameter tuning for tsam aggregation.

This module provides functions for finding optimal aggregation parameters.

TuningResult dataclass

Result of hyperparameter tuning.

Attributes:

Name Type Description
n_clusters int

Optimal number of typical periods.

n_segments int

Optimal number of segments per period.

rmse float

RMSE of the optimal configuration.

history list[dict]

History of all tested configurations with their RMSE values.

best_result AggregationResult

The AggregationResult for the optimal configuration.

all_results list[AggregationResult]

All AggregationResults from tuning.

Examples:

>>> result = find_optimal_combination(df, data_reduction=0.01)
>>> result.summary  # DataFrame of all tested configs
>>> result.plot()   # Visualize results
>>> pareto = find_pareto_front(df, max_timesteps=500)
>>> pareto.find_by_timesteps(100)  # Find config closest to 100 timesteps
>>> for agg_result in pareto:      # Iterate over AggregationResults
...     print(agg_result.accuracy.rmse.mean())
Source code in src/tsam/tuning.py
@dataclass
class TuningResult:
    """Result of hyperparameter tuning.

    Attributes
    ----------
    n_clusters : int
        Optimal number of typical periods.
    n_segments : int
        Optimal number of segments per period.
    rmse : float
        RMSE of the optimal configuration.
    history : list[dict]
        History of all tested configurations with their RMSE values.
    best_result : AggregationResult
        The AggregationResult for the optimal configuration.
    all_results : list[AggregationResult]
        All AggregationResults from tuning.

    Examples
    --------
    >>> result = find_optimal_combination(df, data_reduction=0.01)
    >>> result.summary  # DataFrame of all tested configs
    >>> result.plot()   # Visualize results

    >>> pareto = find_pareto_front(df, max_timesteps=500)
    >>> pareto.find_by_timesteps(100)  # Find config closest to 100 timesteps
    >>> for agg_result in pareto:      # Iterate over AggregationResults
    ...     print(agg_result.accuracy.rmse.mean())
    """

    n_clusters: int
    n_segments: int
    rmse: float
    history: list[dict]
    best_result: AggregationResult
    all_results: list[AggregationResult] = field(default_factory=list)

    @property
    def summary(self) -> pd.DataFrame:
        """Summary DataFrame of all tested configurations."""
        df = pd.DataFrame(self.history)
        if "timesteps" not in df.columns and len(df) > 0:
            df["timesteps"] = df["n_clusters"] * df["n_segments"]
        return df

    def find_by_timesteps(self, target: int) -> AggregationResult:
        """Find the result closest to a target timestep count."""
        if not self.all_results:
            raise ValueError(
                "No results available. Use save_all_results=True in "
                "find_optimal_combination() or use find_pareto_front() instead."
            )

        if len(self.all_results) != len(self.history):
            raise ValueError(
                f"Results/history mismatch: {len(self.all_results)} results vs "
                f"{len(self.history)} history entries. This may indicate "
                "save_all_results was not enabled."
            )

        best_idx = 0
        best_diff = float("inf")

        for i, h in enumerate(self.history):
            diff = abs(h["n_clusters"] * h["n_segments"] - target)
            if diff < best_diff:
                best_diff = diff
                best_idx = i

        return self.all_results[best_idx]

    def find_by_rmse(self, threshold: float) -> AggregationResult:
        """Find the smallest configuration that achieves a target RMSE."""
        if not self.all_results:
            raise ValueError(
                "No results available. Use save_all_results=True in "
                "find_optimal_combination() or use find_pareto_front() instead."
            )

        if len(self.all_results) != len(self.history):
            raise ValueError(
                f"Results/history mismatch: {len(self.all_results)} results vs "
                f"{len(self.history)} history entries. This may indicate "
                "save_all_results was not enabled."
            )

        # Find all configurations meeting the threshold
        candidates: list[tuple[int, int]] = []  # (timesteps, index)
        for i, h in enumerate(self.history):
            if h["rmse"] <= threshold:
                timesteps = h.get("timesteps", h["n_clusters"] * h["n_segments"])
                candidates.append((timesteps, i))

        if not candidates:
            raise ValueError(
                f"No configuration achieves RMSE <= {threshold}. "
                f"Best available: {min(h['rmse'] for h in self.history):.4f}"
            )

        # Return the smallest configuration (by timesteps)
        candidates.sort(key=lambda x: x[0])
        return self.all_results[candidates[0][1]]

    def plot(self, show_labels: bool = True, **kwargs: object) -> object:
        """Plot results (RMSE vs timesteps)."""
        import plotly.graph_objects as go

        summary = self.summary
        hover_text = [
            f"{row['n_clusters']}x{row['n_segments']}<br>"
            f"Timesteps: {row['timesteps']}<br>"
            f"RMSE: {row['rmse']:.4f}"
            for _, row in summary.iterrows()
        ]

        fig = go.Figure()
        fig.add_trace(
            go.Scatter(
                x=summary["timesteps"],
                y=summary["rmse"],
                mode="lines+markers" if len(summary) > 1 else "markers",
                marker={"size": 10},
                hovertext=hover_text if show_labels else None,
                hoverinfo="text" if show_labels else "x+y",
                **kwargs,
            )
        )
        fig.update_layout(
            title="Tuning Results: Complexity vs Accuracy",
            xaxis_title="Timesteps (n_clusters x n_segments)",
            yaxis_title="RMSE",
            hovermode="closest",
        )
        return fig

    def __len__(self) -> int:
        return len(self.all_results)

    def __getitem__(self, index: int) -> AggregationResult:
        return self.all_results[index]

    def __iter__(self):
        return iter(self.all_results)

summary property

summary: DataFrame

Summary DataFrame of all tested configurations.

find_by_timesteps

find_by_timesteps(target: int) -> AggregationResult

Find the result closest to a target timestep count.

Source code in src/tsam/tuning.py
def find_by_timesteps(self, target: int) -> AggregationResult:
    """Find the result closest to a target timestep count."""
    if not self.all_results:
        raise ValueError(
            "No results available. Use save_all_results=True in "
            "find_optimal_combination() or use find_pareto_front() instead."
        )

    if len(self.all_results) != len(self.history):
        raise ValueError(
            f"Results/history mismatch: {len(self.all_results)} results vs "
            f"{len(self.history)} history entries. This may indicate "
            "save_all_results was not enabled."
        )

    best_idx = 0
    best_diff = float("inf")

    for i, h in enumerate(self.history):
        diff = abs(h["n_clusters"] * h["n_segments"] - target)
        if diff < best_diff:
            best_diff = diff
            best_idx = i

    return self.all_results[best_idx]

find_by_rmse

find_by_rmse(threshold: float) -> AggregationResult

Find the smallest configuration that achieves a target RMSE.

Source code in src/tsam/tuning.py
def find_by_rmse(self, threshold: float) -> AggregationResult:
    """Find the smallest configuration that achieves a target RMSE."""
    if not self.all_results:
        raise ValueError(
            "No results available. Use save_all_results=True in "
            "find_optimal_combination() or use find_pareto_front() instead."
        )

    if len(self.all_results) != len(self.history):
        raise ValueError(
            f"Results/history mismatch: {len(self.all_results)} results vs "
            f"{len(self.history)} history entries. This may indicate "
            "save_all_results was not enabled."
        )

    # Find all configurations meeting the threshold
    candidates: list[tuple[int, int]] = []  # (timesteps, index)
    for i, h in enumerate(self.history):
        if h["rmse"] <= threshold:
            timesteps = h.get("timesteps", h["n_clusters"] * h["n_segments"])
            candidates.append((timesteps, i))

    if not candidates:
        raise ValueError(
            f"No configuration achieves RMSE <= {threshold}. "
            f"Best available: {min(h['rmse'] for h in self.history):.4f}"
        )

    # Return the smallest configuration (by timesteps)
    candidates.sort(key=lambda x: x[0])
    return self.all_results[candidates[0][1]]

plot

plot(show_labels: bool = True, **kwargs: object) -> object

Plot results (RMSE vs timesteps).

Source code in src/tsam/tuning.py
def plot(self, show_labels: bool = True, **kwargs: object) -> object:
    """Plot results (RMSE vs timesteps)."""
    import plotly.graph_objects as go

    summary = self.summary
    hover_text = [
        f"{row['n_clusters']}x{row['n_segments']}<br>"
        f"Timesteps: {row['timesteps']}<br>"
        f"RMSE: {row['rmse']:.4f}"
        for _, row in summary.iterrows()
    ]

    fig = go.Figure()
    fig.add_trace(
        go.Scatter(
            x=summary["timesteps"],
            y=summary["rmse"],
            mode="lines+markers" if len(summary) > 1 else "markers",
            marker={"size": 10},
            hovertext=hover_text if show_labels else None,
            hoverinfo="text" if show_labels else "x+y",
            **kwargs,
        )
    )
    fig.update_layout(
        title="Tuning Results: Complexity vs Accuracy",
        xaxis_title="Timesteps (n_clusters x n_segments)",
        yaxis_title="RMSE",
        hovermode="closest",
    )
    return fig

find_clusters_for_reduction

find_clusters_for_reduction(
    n_timesteps: int, n_segments: int, data_reduction: float
) -> int

Calculate max clusters for a target data reduction.

Parameters:

Name Type Description Default
n_timesteps int

Number of original timesteps.

required
n_segments int

Number of segments per period.

required
data_reduction float

Target reduction factor (e.g., 0.1 for 10% of original size).

required

Returns:

Type Description
int

Maximum number of clusters that achieves the reduction.

Examples:

>>> find_clusters_for_reduction(8760, 24, 0.01)  # 1% of hourly year
3
Source code in src/tsam/tuning.py
def find_clusters_for_reduction(
    n_timesteps: int,
    n_segments: int,
    data_reduction: float,
) -> int:
    """Calculate max clusters for a target data reduction.

    Parameters
    ----------
    n_timesteps : int
        Number of original timesteps.
    n_segments : int
        Number of segments per period.
    data_reduction : float
        Target reduction factor (e.g., 0.1 for 10% of original size).

    Returns
    -------
    int
        Maximum number of clusters that achieves the reduction.

    Examples
    --------
    >>> find_clusters_for_reduction(8760, 24, 0.01)  # 1% of hourly year
    3
    """
    return int(np.floor(data_reduction * float(n_timesteps) / n_segments))

find_segments_for_reduction

find_segments_for_reduction(
    n_timesteps: int, n_clusters: int, data_reduction: float
) -> int

Calculate max segments for a target data reduction.

Parameters:

Name Type Description Default
n_timesteps int

Number of original timesteps.

required
n_clusters int

Number of typical periods.

required
data_reduction float

Target reduction factor (e.g., 0.1 for 10% of original size).

required

Returns:

Type Description
int

Maximum number of segments that achieves the reduction.

Examples:

>>> find_segments_for_reduction(8760, 8, 0.01)  # 1% with 8 periods
10
Source code in src/tsam/tuning.py
def find_segments_for_reduction(
    n_timesteps: int,
    n_clusters: int,
    data_reduction: float,
) -> int:
    """Calculate max segments for a target data reduction.

    Parameters
    ----------
    n_timesteps : int
        Number of original timesteps.
    n_clusters : int
        Number of typical periods.
    data_reduction : float
        Target reduction factor (e.g., 0.1 for 10% of original size).

    Returns
    -------
    int
        Maximum number of segments that achieves the reduction.

    Examples
    --------
    >>> find_segments_for_reduction(8760, 8, 0.01)  # 1% with 8 periods
    10
    """
    return int(np.floor(data_reduction * float(n_timesteps) / n_clusters))

find_optimal_combination

find_optimal_combination(
    data: DataFrame,
    data_reduction: float,
    *,
    period_duration: int | float | str = 24,
    temporal_resolution: float | str | None = None,
    cluster: ClusterConfig | None = None,
    segment_representation: RepresentationMethod = "mean",
    extremes: ExtremeConfig | None = None,
    weights: dict[str, float] | None = None,
    preserve_column_means: bool = True,
    round_decimals: int | None = None,
    numerical_tolerance: float = 1e-13,
    show_progress: bool = True,
    save_all_results: bool = False,
    n_jobs: int | None = None,
) -> TuningResult

Find optimal period/segment combination for a target data reduction.

Searches the Pareto-optimal frontier of period/segment combinations that achieve the specified data reduction, returning the one with minimum RMSE.

Parameters:

Name Type Description Default
data DataFrame

Input time series data.

required
data_reduction float

Target reduction factor (e.g., 0.01 for 1% of original size).

required
period_duration int, float, or str

Length of each period. Accepts: - int/float: hours (e.g., 24 for daily, 168 for weekly) - str: pandas Timedelta string (e.g., '24h', '1d', '1w')

24
temporal_resolution float or str

Time resolution of input data. Accepts: - float: hours (e.g., 1.0 for hourly, 0.25 for 15-minute) - str: pandas Timedelta string (e.g., '1h', '15min', '30min') If not provided, inferred from the datetime index.

None
cluster ClusterConfig

Clustering configuration.

None
segment_representation str

How to represent each segment: "mean" or "medoid".

"mean"
extremes ExtremeConfig

Configuration for preserving extreme periods.

None
weights dict[str, float]

Per-column weights that influence all pipeline stages.

None
preserve_column_means bool

Whether to rescale results to preserve original column means.

True
round_decimals int

Round results to this many decimal places.

None
numerical_tolerance float

Numerical tolerance for floating-point comparisons.

1e-13
show_progress bool

Show progress bar during search.

True
save_all_results bool

If True, save all AggregationResults in all_results attribute. Useful for detailed analysis but increases memory usage.

False
n_jobs int

Number of parallel jobs. If None or 1, runs sequentially. Use -1 for all available CPUs, or a positive integer for a specific number of workers. Parallel execution uses a file-based approach where data is saved to a temp file and workers load from disk - no DataFrame pickling, safe for sensitive data.

None

Returns:

Type Description
TuningResult

Result containing optimal parameters and history.

Examples:

>>> result = find_optimal_combination(df, data_reduction=0.01)
>>> print(f"Optimal: {result.n_clusters} periods, "
...       f"{result.n_segments} segments")
>>> # Use all CPUs for faster search (file-based, no DataFrame pickling)
>>> result = find_optimal_combination(df, data_reduction=0.01, n_jobs=-1)
Source code in src/tsam/tuning.py
def find_optimal_combination(
    data: pd.DataFrame,
    data_reduction: float,
    *,
    period_duration: int | float | str = 24,
    temporal_resolution: float | str | None = None,
    cluster: ClusterConfig | None = None,
    segment_representation: RepresentationMethod = "mean",
    extremes: ExtremeConfig | None = None,
    weights: dict[str, float] | None = None,
    preserve_column_means: bool = True,
    round_decimals: int | None = None,
    numerical_tolerance: float = 1e-13,
    show_progress: bool = True,
    save_all_results: bool = False,
    n_jobs: int | None = None,
) -> TuningResult:
    """Find optimal period/segment combination for a target data reduction.

    Searches the Pareto-optimal frontier of period/segment combinations
    that achieve the specified data reduction, returning the one with
    minimum RMSE.

    Parameters
    ----------
    data : pd.DataFrame
        Input time series data.
    data_reduction : float
        Target reduction factor (e.g., 0.01 for 1% of original size).
    period_duration : int, float, or str, default 24
        Length of each period. Accepts:
        - int/float: hours (e.g., 24 for daily, 168 for weekly)
        - str: pandas Timedelta string (e.g., '24h', '1d', '1w')
    temporal_resolution : float or str, optional
        Time resolution of input data. Accepts:
        - float: hours (e.g., 1.0 for hourly, 0.25 for 15-minute)
        - str: pandas Timedelta string (e.g., '1h', '15min', '30min')
        If not provided, inferred from the datetime index.
    cluster : ClusterConfig, optional
        Clustering configuration.
    segment_representation : str, default "mean"
        How to represent each segment: "mean" or "medoid".
    extremes : ExtremeConfig, optional
        Configuration for preserving extreme periods.
    weights : dict[str, float], optional
        Per-column weights that influence all pipeline stages.
    preserve_column_means : bool, default True
        Whether to rescale results to preserve original column means.
    round_decimals : int, optional
        Round results to this many decimal places.
    numerical_tolerance : float, default 1e-13
        Numerical tolerance for floating-point comparisons.
    show_progress : bool, default True
        Show progress bar during search.
    save_all_results : bool, default False
        If True, save all AggregationResults in all_results attribute.
        Useful for detailed analysis but increases memory usage.
    n_jobs : int, optional
        Number of parallel jobs. If None or 1, runs sequentially.
        Use -1 for all available CPUs, or a positive integer for
        a specific number of workers. Parallel execution uses a file-based
        approach where data is saved to a temp file and workers load from
        disk - no DataFrame pickling, safe for sensitive data.

    Returns
    -------
    TuningResult
        Result containing optimal parameters and history.

    Examples
    --------
    >>> result = find_optimal_combination(df, data_reduction=0.01)
    >>> print(f"Optimal: {result.n_clusters} periods, "
    ...       f"{result.n_segments} segments")

    >>> # Use all CPUs for faster search (file-based, no DataFrame pickling)
    >>> result = find_optimal_combination(df, data_reduction=0.01, n_jobs=-1)
    """
    if cluster is None:
        cluster = ClusterConfig()

    # Parse duration parameters to hours
    period_duration_hours = _parse_duration_hours(period_duration, "period_duration")
    temporal_resolution_hours = (
        _parse_duration_hours(temporal_resolution, "temporal_resolution")
        if temporal_resolution is not None
        else _infer_temporal_resolution(data)
    )

    if temporal_resolution_hours <= 0:
        raise ValueError(
            f"temporal_resolution must be positive, got {temporal_resolution_hours}"
        )

    n_timesteps = len(data)
    timesteps_per_period = int(period_duration_hours / temporal_resolution_hours)

    max_periods = n_timesteps // timesteps_per_period
    max_segments = timesteps_per_period

    # Find valid combinations on the Pareto frontier
    possible_segments = np.arange(1, max_segments + 1)
    possible_periods = np.arange(1, max_periods + 1)

    combined_timesteps = np.outer(possible_segments, possible_periods)
    valid_mask = combined_timesteps <= n_timesteps * data_reduction
    valid_timesteps = combined_timesteps * valid_mask

    optimal_periods_idx = np.zeros_like(valid_timesteps, dtype=bool)
    optimal_periods_idx[
        np.arange(valid_timesteps.shape[0]),
        valid_timesteps.argmax(axis=1),
    ] = True

    optimal_segments_idx = np.zeros_like(valid_timesteps, dtype=bool)
    optimal_segments_idx[
        valid_timesteps.argmax(axis=0),
        np.arange(valid_timesteps.shape[1]),
    ] = True

    pareto_mask = optimal_periods_idx & optimal_segments_idx
    pareto_points = np.nonzero(pareto_mask)

    configs_to_test = [
        (int(possible_periods[per_idx]), int(possible_segments[seg_idx]))
        for seg_idx, per_idx in zip(pareto_points[0], pareto_points[1])
    ]

    # Bundle fixed aggregate parameters
    aggregate_opts: _AggregateOpts = {
        "period_duration": period_duration_hours,
        "temporal_resolution": temporal_resolution_hours,
        "cluster": cluster,
        "segment_representation": segment_representation,
        "extremes": extremes,
        "weights": weights,
        "preserve_column_means": preserve_column_means,
        "round_decimals": round_decimals,
        "numerical_tolerance": numerical_tolerance,
    }

    n_workers = _get_n_workers(n_jobs)
    results = _test_configs(
        configs_to_test,
        data,
        aggregate_opts,
        n_workers,
        show_progress=show_progress,
        progress_desc="Searching configurations",
    )

    history: list[dict] = []
    all_results: list[AggregationResult] = []
    best_rmse = float("inf")
    best_result = None
    best_periods = 1
    best_segments = 1

    for n_clusters, n_segments, rmse, result in results:
        if result is not None:
            history.append(
                {"n_clusters": n_clusters, "n_segments": n_segments, "rmse": rmse}
            )
            if save_all_results:
                all_results.append(result)
            if rmse < best_rmse:
                best_rmse = rmse
                best_result = result
                best_periods = n_clusters
                best_segments = n_segments

    if best_result is None:
        raise ValueError("No valid configuration found")

    return TuningResult(
        n_clusters=best_periods,
        n_segments=best_segments,
        rmse=best_rmse,
        history=history,
        best_result=best_result,
        all_results=all_results,
    )

find_pareto_front

find_pareto_front(
    data: DataFrame,
    *,
    period_duration: int | float | str = 24,
    temporal_resolution: float | str | None = None,
    max_timesteps: int | None = None,
    timesteps: Sequence[int] | None = None,
    cluster: ClusterConfig | None = None,
    segment_representation: RepresentationMethod = "mean",
    extremes: ExtremeConfig | None = None,
    weights: dict[str, float] | None = None,
    preserve_column_means: bool = True,
    round_decimals: int | None = None,
    numerical_tolerance: float = 1e-13,
    show_progress: bool = True,
    n_jobs: int | None = None,
) -> TuningResult

Find all Pareto-optimal aggregations from 1 period to full resolution.

Uses a steepest-descent approach to efficiently explore the period/segment space, finding configurations that are optimal for their complexity level.

Parameters:

Name Type Description Default
data DataFrame

Input time series data.

required
period_duration int, float, or str

Length of each period. Accepts: - int/float: hours (e.g., 24 for daily, 168 for weekly) - str: pandas Timedelta string (e.g., '24h', '1d', '1w')

24
temporal_resolution float or str

Time resolution of input data. Accepts: - float: hours (e.g., 1.0 for hourly, 0.25 for 15-minute) - str: pandas Timedelta string (e.g., '1h', '15min', '30min') If not provided, inferred from the datetime index.

None
max_timesteps int

Stop when reaching this many timesteps. If None, explores up to full resolution. Ignored if timesteps is provided.

None
timesteps Sequence[int]

Specific timestep counts to explore. If provided, only evaluates configurations that produce approximately these timestep counts. Useful for faster exploration with large steps or specific ranges. Examples: range(10, 500, 10), [10, 50, 100, 200, 500]

None
cluster ClusterConfig

Clustering configuration.

None
segment_representation str

How to represent each segment: "mean" or "medoid".

"mean"
extremes ExtremeConfig

Configuration for preserving extreme periods.

None
weights dict[str, float]

Per-column weights that influence all pipeline stages.

None
preserve_column_means bool

Whether to rescale results to preserve original column means.

True
round_decimals int

Round results to this many decimal places.

None
numerical_tolerance float

Numerical tolerance for floating-point comparisons.

1e-13
show_progress bool

Show progress bar.

True
n_jobs int

Number of parallel jobs for testing configurations. If None or 1, runs sequentially. Use -1 for all available CPUs. During steepest-descent phase, tests both directions in parallel.

None

Returns:

Type Description
TuningResult

Result object containing Pareto-optimal configurations with convenience methods for analysis and visualization.

Examples:

>>> pareto = find_pareto_front(df, max_timesteps=500)
>>> pareto.summary  # DataFrame of all Pareto-optimal points
>>> pareto.plot()   # Visualize the Pareto front
>>> pareto.find_by_timesteps(100)  # Find config closest to 100 timesteps
>>> pareto.find_by_rmse(0.05)      # Find smallest config with RMSE <= 0.05
>>> # Iterate over AggregationResults
>>> for agg_result in pareto:
...     print(f"RMSE: {agg_result.accuracy.rmse.mean():.4f}")
>>> # Use parallel execution for faster search
>>> pareto = find_pareto_front(df, max_timesteps=500, n_jobs=-1)
>>> # Explore only specific timestep counts (faster)
>>> pareto = find_pareto_front(df, timesteps=range(10, 500, 50))
>>> # Explore a specific list of timestep targets
>>> pareto = find_pareto_front(df, timesteps=[10, 50, 100, 200, 500])
Source code in src/tsam/tuning.py
def find_pareto_front(
    data: pd.DataFrame,
    *,
    period_duration: int | float | str = 24,
    temporal_resolution: float | str | None = None,
    max_timesteps: int | None = None,
    timesteps: Sequence[int] | None = None,
    cluster: ClusterConfig | None = None,
    segment_representation: RepresentationMethod = "mean",
    extremes: ExtremeConfig | None = None,
    weights: dict[str, float] | None = None,
    preserve_column_means: bool = True,
    round_decimals: int | None = None,
    numerical_tolerance: float = 1e-13,
    show_progress: bool = True,
    n_jobs: int | None = None,
) -> TuningResult:
    """Find all Pareto-optimal aggregations from 1 period to full resolution.

    Uses a steepest-descent approach to efficiently explore the
    period/segment space, finding configurations that are optimal
    for their complexity level.

    Parameters
    ----------
    data : pd.DataFrame
        Input time series data.
    period_duration : int, float, or str, default 24
        Length of each period. Accepts:
        - int/float: hours (e.g., 24 for daily, 168 for weekly)
        - str: pandas Timedelta string (e.g., '24h', '1d', '1w')
    temporal_resolution : float or str, optional
        Time resolution of input data. Accepts:
        - float: hours (e.g., 1.0 for hourly, 0.25 for 15-minute)
        - str: pandas Timedelta string (e.g., '1h', '15min', '30min')
        If not provided, inferred from the datetime index.
    max_timesteps : int, optional
        Stop when reaching this many timesteps. If None, explores
        up to full resolution. Ignored if `timesteps` is provided.
    timesteps : Sequence[int], optional
        Specific timestep counts to explore. If provided, only evaluates
        configurations that produce approximately these timestep counts.
        Useful for faster exploration with large steps or specific ranges.
        Examples: range(10, 500, 10), [10, 50, 100, 200, 500]
    cluster : ClusterConfig, optional
        Clustering configuration.
    segment_representation : str, default "mean"
        How to represent each segment: "mean" or "medoid".
    extremes : ExtremeConfig, optional
        Configuration for preserving extreme periods.
    weights : dict[str, float], optional
        Per-column weights that influence all pipeline stages.
    preserve_column_means : bool, default True
        Whether to rescale results to preserve original column means.
    round_decimals : int, optional
        Round results to this many decimal places.
    numerical_tolerance : float, default 1e-13
        Numerical tolerance for floating-point comparisons.
    show_progress : bool, default True
        Show progress bar.
    n_jobs : int, optional
        Number of parallel jobs for testing configurations.
        If None or 1, runs sequentially. Use -1 for all available CPUs.
        During steepest-descent phase, tests both directions in parallel.

    Returns
    -------
    TuningResult
        Result object containing Pareto-optimal configurations with
        convenience methods for analysis and visualization.

    Examples
    --------
    >>> pareto = find_pareto_front(df, max_timesteps=500)
    >>> pareto.summary  # DataFrame of all Pareto-optimal points
    >>> pareto.plot()   # Visualize the Pareto front
    >>> pareto.find_by_timesteps(100)  # Find config closest to 100 timesteps
    >>> pareto.find_by_rmse(0.05)      # Find smallest config with RMSE <= 0.05

    >>> # Iterate over AggregationResults
    >>> for agg_result in pareto:
    ...     print(f"RMSE: {agg_result.accuracy.rmse.mean():.4f}")

    >>> # Use parallel execution for faster search
    >>> pareto = find_pareto_front(df, max_timesteps=500, n_jobs=-1)

    >>> # Explore only specific timestep counts (faster)
    >>> pareto = find_pareto_front(df, timesteps=range(10, 500, 50))

    >>> # Explore a specific list of timestep targets
    >>> pareto = find_pareto_front(df, timesteps=[10, 50, 100, 200, 500])
    """
    if cluster is None:
        cluster = ClusterConfig()

    # Parse duration parameters to hours
    period_duration_hours = _parse_duration_hours(period_duration, "period_duration")
    temporal_resolution_hours = (
        _parse_duration_hours(temporal_resolution, "temporal_resolution")
        if temporal_resolution is not None
        else _infer_temporal_resolution(data)
    )

    if temporal_resolution_hours <= 0:
        raise ValueError(
            f"temporal_resolution must be positive, got {temporal_resolution_hours}"
        )

    n_timesteps = len(data)
    timesteps_per_period = int(period_duration_hours / temporal_resolution_hours)

    max_periods = n_timesteps // timesteps_per_period
    max_segments = timesteps_per_period

    if max_timesteps is None:
        max_timesteps = n_timesteps

    # Bundle fixed aggregate parameters
    aggregate_opts: _AggregateOpts = {
        "period_duration": period_duration_hours,
        "temporal_resolution": temporal_resolution_hours,
        "cluster": cluster,
        "segment_representation": segment_representation,
        "extremes": extremes,
        "weights": weights,
        "preserve_column_means": preserve_column_means,
        "round_decimals": round_decimals,
        "numerical_tolerance": numerical_tolerance,
    }

    n_workers = _get_n_workers(n_jobs)

    # If specific timesteps are provided, use targeted exploration
    if timesteps is not None:
        return _find_pareto_front_targeted(
            data=data,
            timesteps=timesteps,
            max_periods=max_periods,
            max_segments=max_segments,
            aggregate_opts=aggregate_opts,
            show_progress=show_progress,
            n_workers=n_workers,
        )

    # Steepest descent exploration
    return _find_pareto_front_steepest(
        data=data,
        max_periods=max_periods,
        max_segments=max_segments,
        max_timesteps=max_timesteps,
        aggregate_opts=aggregate_opts,
        show_progress=show_progress,
        n_workers=n_workers,
    )