src/mine_sim/aggregate.py

โ† Back to submission ยท View raw on GitHub

"""Cross-replication KPI aggregation with Student-t 95% confidence intervals.

Where :mod:`mine_sim.metrics` produces one frozen
:class:`~mine_sim.metrics.ReplicationMetrics` per shift, this module
collapses a list of them into a single :class:`ScenarioSummary` carrying the
mean and Student-t (n-1) 95% CI for every reported KPI.

Design contracts:

* CIs are ``mean +/- t_{n-1, 0.975} * s / sqrt(n)`` via
  :func:`scipy.stats.t.ppf`. Degenerate cases (n < 2 or zero variance)
  collapse the half-width to zero (both bounds equal the mean) rather than
  producing NaN, so JSON serialisation stays numeric.
* ``top_bottlenecks`` is ranked by the composite score
  ``mean(utilisation) * mean(queue_wait_min)``. Loaders, the crusher, and
  every capacity-1 edge participate in one ranking.
* All return types are immutable so a summary is safe to share across
  writers.
* No I/O here โ€” callers pass already-loaded records and get a dataclass back.
"""

from __future__ import annotations

import math
from dataclasses import dataclass
from types import MappingProxyType
from typing import Iterable, Mapping, Sequence

from scipy import stats  # type: ignore[import-untyped]

from mine_sim.metrics import ReplicationMetrics
from mine_sim.runner import ReplicationResult

#: Two-sided confidence level.
DEFAULT_CONFIDENCE_LEVEL: float = 0.95


@dataclass(frozen=True)
class StatSummary:
    """Mean + Student-t CI bundle for one KPI series.

    All numeric fields are guaranteed finite; degenerate cases yield
    ``ci95_low == ci95_high == mean`` rather than NaN.
    """

    mean: float
    ci95_low: float
    ci95_high: float
    std: float
    n: int

    @property
    def half_width(self) -> float:
        return self.ci95_high - self.mean


def _sample_std(values: Sequence[float]) -> float:
    """Sample standard deviation with n-1 degrees of freedom."""
    n = len(values)
    if n < 2:
        return 0.0
    mean = sum(values) / n
    sse = sum((v - mean) ** 2 for v in values)
    return math.sqrt(sse / (n - 1))


def student_t_ci_95(
    values: Sequence[float],
    *,
    confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> StatSummary:
    """Return mean and Student-t (n-1) two-sided CI for ``values``.

    * n == 0 -> all-zero summary (avoids NaN in JSON).
    * n == 1 or zero variance -> half-width 0 (``mean +/- 0``).
    * Otherwise uses :func:`scipy.stats.t.ppf` for the critical value.
    """
    if not 0.0 < confidence < 1.0:
        raise ValueError(
            f"confidence must be strictly between 0 and 1, got {confidence}"
        )
    n = len(values)
    if n == 0:
        return StatSummary(mean=0.0, ci95_low=0.0, ci95_high=0.0, std=0.0, n=0)

    mean = sum(values) / n
    if n < 2:
        return StatSummary(mean=mean, ci95_low=mean, ci95_high=mean, std=0.0, n=n)

    std = _sample_std(values)
    if std == 0.0:
        return StatSummary(mean=mean, ci95_low=mean, ci95_high=mean, std=0.0, n=n)

    alpha = 1.0 - confidence
    t_crit = float(stats.t.ppf(1.0 - alpha / 2.0, df=n - 1))
    half_width = t_crit * std / math.sqrt(n)
    return StatSummary(
        mean=mean,
        ci95_low=mean - half_width,
        ci95_high=mean + half_width,
        std=std,
        n=n,
    )


@dataclass(frozen=True)
class BottleneckEntry:
    """One row of the ``top_bottlenecks`` ranking.

    ``composite_score = utilisation_mean * mean_queue_wait_min``.
    """

    resource_id: str
    resource_kind: str  # "loader" | "crusher" | "edge"
    utilisation_mean: float
    mean_queue_wait_min: float
    composite_score: float


@dataclass(frozen=True)
class LoaderSummary:
    loader_id: str
    utilisation: StatSummary
    mean_queue_wait_min: StatSummary
    services_completed: StatSummary


@dataclass(frozen=True)
class CrusherSummary:
    dump_id: str
    utilisation: StatSummary
    mean_queue_wait_min: StatSummary
    services_completed: StatSummary


@dataclass(frozen=True)
class EdgeSummary:
    edge_id: str
    utilisation: StatSummary
    mean_queue_wait_min: StatSummary
    mean_traversal_time_min: StatSummary
    traversal_count: StatSummary


@dataclass(frozen=True)
class ScenarioSummary:
    """Cross-replication KPI summary for one scenario."""

    scenario_id: str
    replications: int
    shift_length_hours: float

    total_tonnes_delivered: StatSummary
    tonnes_per_hour: StatSummary
    average_truck_cycle_time_min: StatSummary
    average_truck_utilisation: StatSummary
    crusher_utilisation: StatSummary
    average_loader_queue_time_min: StatSummary
    average_crusher_queue_time_min: StatSummary

    loaders: Mapping[str, LoaderSummary]
    crusher: CrusherSummary
    edges: Mapping[str, EdgeSummary]

    top_bottlenecks: tuple[BottleneckEntry, ...]


@dataclass(frozen=True)
class RunSummary:
    """A flat collection of :class:`ScenarioSummary` keyed by scenario_id."""

    scenarios: Mapping[str, ScenarioSummary]

    @property
    def scenario_ids(self) -> tuple[str, ...]:
        return tuple(self.scenarios.keys())


# ---------------------------------------------------------------------------
# Aggregation helpers
# ---------------------------------------------------------------------------
def _coerce_metrics(
    reps: Sequence[ReplicationMetrics] | Sequence[ReplicationResult],
) -> tuple[ReplicationMetrics, ...]:
    """Accept either ``ReplicationMetrics`` or ``ReplicationResult``."""
    out: list[ReplicationMetrics] = []
    for rep in reps:
        if isinstance(rep, ReplicationMetrics):
            out.append(rep)
        elif hasattr(rep, "metrics") and isinstance(rep.metrics, ReplicationMetrics):
            out.append(rep.metrics)
        else:
            raise TypeError(
                "Aggregation expected ReplicationMetrics or ReplicationResult, "
                f"got {type(rep).__name__}"
            )
    return tuple(out)


def _series(values: Iterable[float]) -> StatSummary:
    return student_t_ci_95(tuple(float(v) for v in values))


def _validate_homogeneous_scenario(reps: Sequence[ReplicationMetrics]) -> str:
    if not reps:
        raise ValueError("Cannot aggregate an empty replication sequence.")
    scenario_ids = {rep.scenario_id for rep in reps}
    if len(scenario_ids) != 1:
        raise ValueError(
            "All replications passed to aggregate_scenario must share one "
            f"scenario_id; got {sorted(scenario_ids)}"
        )
    return next(iter(scenario_ids))


def _resource_id_universe(
    reps: Sequence[ReplicationMetrics],
    attr: str,
) -> tuple[str, ...]:
    """Stable sorted union of resource ids across replications."""
    ids: set[str] = set()
    for rep in reps:
        ids.update(getattr(rep, attr).keys())
    return tuple(sorted(ids))


def _loader_series(reps, loader_id, extractor) -> StatSummary:
    return _series(
        extractor(rep.loaders[loader_id]) for rep in reps if loader_id in rep.loaders
    )


def _edge_series(reps, edge_id, extractor) -> StatSummary:
    return _series(
        extractor(rep.edges[edge_id]) for rep in reps if edge_id in rep.edges
    )


def _build_loader_summaries(reps) -> dict[str, LoaderSummary]:
    out: dict[str, LoaderSummary] = {}
    for loader_id in _resource_id_universe(reps, "loaders"):
        out[loader_id] = LoaderSummary(
            loader_id=loader_id,
            utilisation=_loader_series(reps, loader_id, lambda lm: lm.utilisation),
            mean_queue_wait_min=_loader_series(
                reps, loader_id, lambda lm: lm.mean_queue_wait_min
            ),
            services_completed=_loader_series(
                reps, loader_id, lambda lm: lm.services_completed
            ),
        )
    return out


def _build_edge_summaries(reps) -> dict[str, EdgeSummary]:
    out: dict[str, EdgeSummary] = {}
    for edge_id in _resource_id_universe(reps, "edges"):
        out[edge_id] = EdgeSummary(
            edge_id=edge_id,
            utilisation=_edge_series(reps, edge_id, lambda em: em.utilisation),
            mean_queue_wait_min=_edge_series(
                reps, edge_id, lambda em: em.mean_queue_wait_min
            ),
            mean_traversal_time_min=_edge_series(
                reps, edge_id, lambda em: em.mean_traversal_time_min
            ),
            traversal_count=_edge_series(reps, edge_id, lambda em: em.traversal_count),
        )
    return out


def _rank_bottlenecks(
    loaders: Mapping[str, LoaderSummary],
    crusher: CrusherSummary,
    edges: Mapping[str, EdgeSummary],
    top_n: int,
) -> tuple[BottleneckEntry, ...]:
    """Rank loaders + crusher + edges by composite score (util x queue wait)."""
    entries: list[BottleneckEntry] = []

    def add(resource_id: str, kind: str, util: float, wait: float) -> None:
        entries.append(
            BottleneckEntry(
                resource_id=resource_id,
                resource_kind=kind,
                utilisation_mean=util,
                mean_queue_wait_min=wait,
                composite_score=util * wait,
            )
        )

    for loader_id, summary in loaders.items():
        add(loader_id, "loader", summary.utilisation.mean, summary.mean_queue_wait_min.mean)
    add(
        crusher.dump_id,
        "crusher",
        crusher.utilisation.mean,
        crusher.mean_queue_wait_min.mean,
    )
    for edge_id, summary in edges.items():
        add(edge_id, "edge", summary.utilisation.mean, summary.mean_queue_wait_min.mean)

    # Sort: composite desc, then util desc, then id asc for full determinism.
    entries.sort(key=lambda b: (-b.composite_score, -b.utilisation_mean, b.resource_id))
    return tuple(entries[: max(0, int(top_n))])


def aggregate_scenario(
    replications: Sequence[ReplicationMetrics] | Sequence[ReplicationResult],
    *,
    top_bottleneck_count: int = 5,
) -> ScenarioSummary:
    """Aggregate one scenario's replications into a :class:`ScenarioSummary`."""
    reps = _coerce_metrics(replications)
    scenario_id = _validate_homogeneous_scenario(reps)
    shift_length_hours = reps[0].shift_length_min / 60.0

    crusher_util = _series(rep.crusher.utilisation for rep in reps)
    crusher_queue = _series(rep.average_crusher_queue_time_min for rep in reps)

    loaders = _build_loader_summaries(reps)
    edges = _build_edge_summaries(reps)
    crusher = CrusherSummary(
        dump_id=reps[0].crusher.dump_id,
        utilisation=crusher_util,
        mean_queue_wait_min=crusher_queue,
        services_completed=_series(rep.crusher.services_completed for rep in reps),
    )

    return ScenarioSummary(
        scenario_id=scenario_id,
        replications=len(reps),
        shift_length_hours=shift_length_hours,
        total_tonnes_delivered=_series(rep.total_tonnes_delivered for rep in reps),
        tonnes_per_hour=_series(rep.tonnes_per_hour for rep in reps),
        average_truck_cycle_time_min=_series(
            rep.average_truck_cycle_time_min for rep in reps
        ),
        average_truck_utilisation=_series(
            rep.average_truck_utilisation for rep in reps
        ),
        crusher_utilisation=crusher_util,
        average_loader_queue_time_min=_series(
            rep.average_loader_queue_time_min for rep in reps
        ),
        average_crusher_queue_time_min=crusher_queue,
        loaders=MappingProxyType(loaders),
        crusher=crusher,
        edges=MappingProxyType(edges),
        top_bottlenecks=_rank_bottlenecks(loaders, crusher, edges, top_bottleneck_count),
    )


def aggregate_run(
    run: Mapping[str, Sequence[ReplicationMetrics] | Sequence[ReplicationResult]],
    *,
    top_bottleneck_count: int = 5,
) -> RunSummary:
    """Aggregate a multi-scenario run into a :class:`RunSummary`."""
    summaries: dict[str, ScenarioSummary] = {}
    for scenario_id, reps in run.items():
        summary = aggregate_scenario(reps, top_bottleneck_count=top_bottleneck_count)
        if summary.scenario_id != scenario_id:
            raise ValueError(
                f"Scenario id mismatch: key '{scenario_id}' vs metrics "
                f"'{summary.scenario_id}'. Refusing to mis-key the summary."
            )
        summaries[scenario_id] = summary
    return RunSummary(scenarios=MappingProxyType(summaries))


__all__ = [
    "BottleneckEntry",
    "CrusherSummary",
    "DEFAULT_CONFIDENCE_LEVEL",
    "EdgeSummary",
    "LoaderSummary",
    "RunSummary",
    "ScenarioSummary",
    "StatSummary",
    "aggregate_run",
    "aggregate_scenario",
    "student_t_ci_95",
]