src/mine_sim/aggregate.py

← Back to submission · View raw on GitHub

"""Cross-replication KPI aggregation with Student-t 95% confidence intervals.

This is the "horizontal" counterpart to :mod:`mine_sim.metrics`. Where
``metrics`` produces one frozen :class:`~mine_sim.metrics.ReplicationMetrics`
per shift, this module collapses a list of such records into a single
:class:`ScenarioSummary` carrying the mean and Student-t n-1 95% CI for
every KPI the Seed asks us to report.

Design contracts (Seed-derived):

* Confidence intervals are computed as ``mean ± t_{n-1, 0.975} * s / sqrt(n)``
  using Student's t distribution (:func:`scipy.stats.t.ppf`). For
  degenerate cases (n < 2 or sample variance == 0) the half-width
  collapses to zero — both bounds equal the mean — rather than NaN, so
  downstream JSON serialisation remains numeric.
* ``top_bottlenecks`` is ranked by ``mean(utilisation) * mean(queue_wait)``
  across replications, the "composite bottleneck score" decided in the
  pre-implementation interview. Loaders, the crusher, and every
  capacity-1 edge participate in the same ranking.
* All return types are immutable (frozen dataclasses + ``MappingProxyType``)
  so a summary is safe to share across writers (CSV, JSON, README
  rendering) without anyone accidentally mutating a value mid-write.
* No I/O lives in this module — that is the next sub-AC's job. Callers
  hand in already-loaded :class:`ReplicationMetrics` records and get a
  pure-Python dataclass back.

The public surface is intentionally small:

* :func:`student_t_ci_95` — given any sequence of floats, returns a
  :class:`StatSummary(mean, ci_low, ci_high, std, n)`. Used by the
  scenario aggregator and re-exported for downstream tooling.
* :func:`aggregate_scenario` — takes a sequence of
  :class:`ReplicationMetrics` (all from the same scenario), returns a
  :class:`ScenarioSummary`.
* :func:`aggregate_run` — takes a :class:`MultiScenarioRunResult` (or a
  mapping of scenario_id -> reps) and returns a :class:`RunSummary`
  carrying one :class:`ScenarioSummary` per scenario.
"""

from __future__ import annotations

import math
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import Iterable, Mapping, Sequence

from scipy import stats  # type: ignore[import-untyped]

from mine_sim.metrics import ReplicationMetrics
from mine_sim.runner import ReplicationResult

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
#: Two-sided 95% confidence level — the number repeated in the Seed.
DEFAULT_CONFIDENCE_LEVEL: float = 0.95


# ---------------------------------------------------------------------------
# Statistical primitives
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class StatSummary:
    """Mean + Student-t CI bundle for a single KPI series.

    All four numeric fields are guaranteed finite: degenerate cases (n<2
    or zero variance) yield ``ci_low == ci_high == mean`` rather than
    NaN.

    Fields are deliberately ordered so the JSON dump reads naturally:
    ``mean`` first, then the bounds, then the supporting stats.
    """

    mean: float
    ci95_low: float
    ci95_high: float
    std: float
    n: int

    @property
    def half_width(self) -> float:
        """Half the CI width (``ci95_high - mean``)."""
        return self.ci95_high - self.mean


def _sample_std(values: Sequence[float]) -> float:
    """Sample standard deviation with n-1 degrees of freedom.

    Avoids ``numpy`` so this module's scientific dependency surface stays
    at ``scipy.stats`` only.
    """
    n = len(values)
    if n < 2:
        return 0.0
    mean = sum(values) / n
    sse = sum((v - mean) ** 2 for v in values)
    return math.sqrt(sse / (n - 1))


def student_t_ci_95(
    values: Sequence[float],
    *,
    confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> StatSummary:
    """Return mean and Student-t (n-1) two-sided CI for ``values``.

    Parameters
    ----------
    values:
        Iterable of finite floats — typically one entry per replication.
    confidence:
        Two-sided confidence level (default 0.95).

    Notes
    -----
    * For n == 0 we return an all-zero summary so downstream JSON does
      not contain ``NaN``. Callers that care can inspect ``n == 0``.
    * For n == 1 or zero sample variance the half-width is zero by
      definition; the function returns ``mean ± 0``.
    * The critical value uses :func:`scipy.stats.t.ppf` so the result
      matches every standard simulation textbook within float precision.
    """
    if not 0.0 < confidence < 1.0:
        raise ValueError(
            f"confidence must be strictly between 0 and 1, got {confidence}"
        )
    n = len(values)
    if n == 0:
        return StatSummary(mean=0.0, ci95_low=0.0, ci95_high=0.0, std=0.0, n=0)

    mean = sum(values) / n
    if n < 2:
        return StatSummary(mean=mean, ci95_low=mean, ci95_high=mean, std=0.0, n=n)

    std = _sample_std(values)
    if std == 0.0:
        return StatSummary(mean=mean, ci95_low=mean, ci95_high=mean, std=0.0, n=n)

    alpha = 1.0 - confidence
    t_crit = float(stats.t.ppf(1.0 - alpha / 2.0, df=n - 1))
    half_width = t_crit * std / math.sqrt(n)
    return StatSummary(
        mean=mean,
        ci95_low=mean - half_width,
        ci95_high=mean + half_width,
        std=std,
        n=n,
    )


# ---------------------------------------------------------------------------
# Bottleneck ranking
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class BottleneckEntry:
    """One row of the ``top_bottlenecks`` ranking.

    The composite score is ``utilisation_mean * mean_queue_wait_min`` —
    the same definition recorded in the design memory.
    """

    resource_id: str
    resource_kind: str  # "loader" | "crusher" | "edge"
    utilisation_mean: float
    mean_queue_wait_min: float
    composite_score: float


# ---------------------------------------------------------------------------
# Per-resource summary records
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class LoaderSummary:
    """Aggregated KPIs for a single loader across replications."""

    loader_id: str
    utilisation: StatSummary
    mean_queue_wait_min: StatSummary
    services_completed: StatSummary


@dataclass(frozen=True)
class CrusherSummary:
    """Aggregated KPIs for the crusher across replications."""

    dump_id: str
    utilisation: StatSummary
    mean_queue_wait_min: StatSummary
    services_completed: StatSummary


@dataclass(frozen=True)
class EdgeSummary:
    """Aggregated KPIs for one capacity-1 edge across replications."""

    edge_id: str
    utilisation: StatSummary
    mean_queue_wait_min: StatSummary
    mean_traversal_time_min: StatSummary
    traversal_count: StatSummary


# ---------------------------------------------------------------------------
# Top-level scenario summary
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class ScenarioSummary:
    """Cross-replication KPI summary for one scenario.

    The exact field set is dictated by the Seed AC for ``summary.json``:
    means and Student-t 95% CIs for the headline throughput KPIs, plus
    per-resource utilisation / queue-wait, plus a composite bottleneck
    ranking. Narrative fields (``key_assumptions``, ``model_limitations``,
    ``additional_scenarios_proposed``) are *not* this module's concern —
    they are loaded from disk by the writer.
    """

    scenario_id: str
    replications: int
    shift_length_hours: float

    total_tonnes_delivered: StatSummary
    tonnes_per_hour: StatSummary
    average_truck_cycle_time_min: StatSummary
    average_truck_utilisation: StatSummary
    crusher_utilisation: StatSummary
    average_loader_queue_time_min: StatSummary
    average_crusher_queue_time_min: StatSummary

    loaders: Mapping[str, LoaderSummary]
    crusher: CrusherSummary
    edges: Mapping[str, EdgeSummary]

    top_bottlenecks: tuple[BottleneckEntry, ...]


@dataclass(frozen=True)
class RunSummary:
    """A flat collection of :class:`ScenarioSummary` keyed by scenario_id.

    Iteration order matches the order the scenarios were aggregated
    (which itself matches the run order downstream of
    :func:`mine_sim.scenario_runner.run_all_scenarios`), so JSON dumps
    are deterministic.
    """

    scenarios: Mapping[str, ScenarioSummary]

    @property
    def scenario_ids(self) -> tuple[str, ...]:
        return tuple(self.scenarios.keys())


# ---------------------------------------------------------------------------
# Aggregation helpers
# ---------------------------------------------------------------------------
def _coerce_metrics(
    reps: Sequence[ReplicationMetrics] | Sequence[ReplicationResult],
) -> tuple[ReplicationMetrics, ...]:
    """Accept either ``ReplicationMetrics`` directly or ``ReplicationResult``."""
    out: list[ReplicationMetrics] = []
    for rep in reps:
        if isinstance(rep, ReplicationMetrics):
            out.append(rep)
        elif hasattr(rep, "metrics") and isinstance(rep.metrics, ReplicationMetrics):
            out.append(rep.metrics)
        else:
            raise TypeError(
                "Aggregation expected ReplicationMetrics or ReplicationResult, "
                f"got {type(rep).__name__}"
            )
    return tuple(out)


def _series(values: Iterable[float]) -> StatSummary:
    """Convenience wrapper: build a :class:`StatSummary` from an iterable."""
    return student_t_ci_95(tuple(float(v) for v in values))


def _validate_homogeneous_scenario(reps: Sequence[ReplicationMetrics]) -> str:
    if not reps:
        raise ValueError("Cannot aggregate an empty replication sequence.")
    scenario_ids = {rep.scenario_id for rep in reps}
    if len(scenario_ids) != 1:
        raise ValueError(
            "All replications passed to aggregate_scenario must share the "
            f"same scenario_id; got {sorted(scenario_ids)}"
        )
    return next(iter(scenario_ids))


def _resource_id_universe(
    reps: Sequence[ReplicationMetrics],
    attr: str,
) -> tuple[str, ...]:
    """Stable sorted union of resource IDs seen across replications.

    Different scenarios can have different fleet sizes / edge sets, but
    *within* one scenario every replication should expose the same
    resource ids. We still take the union (not the intersection) so a
    single missing edge does not silently drop a row.
    """
    ids: set[str] = set()
    for rep in reps:
        ids.update(getattr(rep, attr).keys())
    return tuple(sorted(ids))


def _loader_series(
    reps: Sequence[ReplicationMetrics],
    loader_id: str,
    extractor,
) -> StatSummary:
    return _series(extractor(rep.loaders[loader_id]) for rep in reps if loader_id in rep.loaders)


def _edge_series(
    reps: Sequence[ReplicationMetrics],
    edge_id: str,
    extractor,
) -> StatSummary:
    return _series(extractor(rep.edges[edge_id]) for rep in reps if edge_id in rep.edges)


# ---------------------------------------------------------------------------
# Public aggregation API
# ---------------------------------------------------------------------------
def aggregate_scenario(
    replications: Sequence[ReplicationMetrics] | Sequence[ReplicationResult],
    *,
    top_bottleneck_count: int = 5,
) -> ScenarioSummary:
    """Aggregate a single scenario's replications into a :class:`ScenarioSummary`.

    Parameters
    ----------
    replications:
        Sequence of :class:`ReplicationMetrics` or
        :class:`ReplicationResult` records, all sharing the same
        ``scenario_id``. Order is preserved but not semantically
        meaningful — every aggregation is symmetric in the sample.
    top_bottleneck_count:
        How many entries to include in ``top_bottlenecks``. Defaults to
        5 (the table the README renders); increase for diagnostics.

    Raises
    ------
    ValueError:
        If the sequence is empty or contains records from multiple
        scenarios. Failing loudly here prevents silent mis-aggregation.
    """
    reps = _coerce_metrics(replications)
    scenario_id = _validate_homogeneous_scenario(reps)

    shift_length_hours = reps[0].shift_length_min / 60.0

    # ----- Headline throughput KPIs -----------------------------------------
    total_tonnes = _series(rep.total_tonnes_delivered for rep in reps)
    tph = _series(rep.tonnes_per_hour for rep in reps)
    cycle_time = _series(rep.average_truck_cycle_time_min for rep in reps)
    truck_util = _series(rep.average_truck_utilisation for rep in reps)
    crusher_util = _series(rep.crusher.utilisation for rep in reps)
    loader_queue = _series(rep.average_loader_queue_time_min for rep in reps)
    crusher_queue = _series(rep.average_crusher_queue_time_min for rep in reps)

    # ----- Per-loader summaries ---------------------------------------------
    loader_ids = _resource_id_universe(reps, "loaders")
    loader_summaries: dict[str, LoaderSummary] = {}
    for loader_id in loader_ids:
        loader_summaries[loader_id] = LoaderSummary(
            loader_id=loader_id,
            utilisation=_loader_series(reps, loader_id, lambda lm: lm.utilisation),
            mean_queue_wait_min=_loader_series(
                reps, loader_id, lambda lm: lm.mean_queue_wait_min
            ),
            services_completed=_loader_series(
                reps, loader_id, lambda lm: lm.services_completed
            ),
        )

    # ----- Crusher summary --------------------------------------------------
    crusher_id = reps[0].crusher.dump_id
    crusher_summary = CrusherSummary(
        dump_id=crusher_id,
        utilisation=crusher_util,
        mean_queue_wait_min=crusher_queue,
        services_completed=_series(rep.crusher.services_completed for rep in reps),
    )

    # ----- Per-edge summaries -----------------------------------------------
    edge_ids = _resource_id_universe(reps, "edges")
    edge_summaries: dict[str, EdgeSummary] = {}
    for edge_id in edge_ids:
        edge_summaries[edge_id] = EdgeSummary(
            edge_id=edge_id,
            utilisation=_edge_series(reps, edge_id, lambda em: em.utilisation),
            mean_queue_wait_min=_edge_series(
                reps, edge_id, lambda em: em.mean_queue_wait_min
            ),
            mean_traversal_time_min=_edge_series(
                reps, edge_id, lambda em: em.mean_traversal_time_min
            ),
            traversal_count=_edge_series(
                reps, edge_id, lambda em: em.traversal_count
            ),
        )

    # ----- Bottleneck ranking ----------------------------------------------
    bottleneck_entries: list[BottleneckEntry] = []
    for loader_id, summary in loader_summaries.items():
        util = summary.utilisation.mean
        wait = summary.mean_queue_wait_min.mean
        bottleneck_entries.append(
            BottleneckEntry(
                resource_id=loader_id,
                resource_kind="loader",
                utilisation_mean=util,
                mean_queue_wait_min=wait,
                composite_score=util * wait,
            )
        )
    bottleneck_entries.append(
        BottleneckEntry(
            resource_id=crusher_id,
            resource_kind="crusher",
            utilisation_mean=crusher_summary.utilisation.mean,
            mean_queue_wait_min=crusher_summary.mean_queue_wait_min.mean,
            composite_score=(
                crusher_summary.utilisation.mean
                * crusher_summary.mean_queue_wait_min.mean
            ),
        )
    )
    for edge_id, summary in edge_summaries.items():
        util = summary.utilisation.mean
        wait = summary.mean_queue_wait_min.mean
        bottleneck_entries.append(
            BottleneckEntry(
                resource_id=edge_id,
                resource_kind="edge",
                utilisation_mean=util,
                mean_queue_wait_min=wait,
                composite_score=util * wait,
            )
        )
    # Sort: primary key composite_score desc, secondary util desc, then id asc
    # for a fully deterministic order (no Python "best effort" tie behaviour).
    bottleneck_entries.sort(
        key=lambda b: (-b.composite_score, -b.utilisation_mean, b.resource_id)
    )
    top_bottlenecks = tuple(bottleneck_entries[: max(0, int(top_bottleneck_count))])

    return ScenarioSummary(
        scenario_id=scenario_id,
        replications=len(reps),
        shift_length_hours=shift_length_hours,
        total_tonnes_delivered=total_tonnes,
        tonnes_per_hour=tph,
        average_truck_cycle_time_min=cycle_time,
        average_truck_utilisation=truck_util,
        crusher_utilisation=crusher_util,
        average_loader_queue_time_min=loader_queue,
        average_crusher_queue_time_min=crusher_queue,
        loaders=MappingProxyType(loader_summaries),
        crusher=crusher_summary,
        edges=MappingProxyType(edge_summaries),
        top_bottlenecks=top_bottlenecks,
    )


def aggregate_run(
    run: Mapping[str, Sequence[ReplicationMetrics] | Sequence[ReplicationResult]],
    *,
    top_bottleneck_count: int = 5,
) -> RunSummary:
    """Aggregate a multi-scenario run into a :class:`RunSummary`.

    Parameters
    ----------
    run:
        Mapping ``scenario_id -> Sequence[ReplicationMetrics|Result]``.
        The natural source is
        ``{sid: r.replications for sid, r in multi.results.items()}``
        from a :class:`MultiScenarioRunResult`. A small adapter
        (:func:`from_multi_scenario_result`) is provided for that case.
    """
    summaries: dict[str, ScenarioSummary] = {}
    for scenario_id, reps in run.items():
        summary = aggregate_scenario(
            reps, top_bottleneck_count=top_bottleneck_count
        )
        if summary.scenario_id != scenario_id:
            raise ValueError(
                f"Scenario id mismatch: key '{scenario_id}' vs metrics "
                f"'{summary.scenario_id}'. Refusing to mis-key the summary."
            )
        summaries[scenario_id] = summary
    return RunSummary(scenarios=MappingProxyType(summaries))


def from_multi_scenario_result(multi) -> RunSummary:  # pragma: no cover - adapter
    """Tiny adapter so callers don't have to inline the dict comprehension.

    Accepts a :class:`mine_sim.scenario_runner.MultiScenarioRunResult` and
    returns the corresponding :class:`RunSummary`. Imported lazily to
    keep this module's import graph minimal.
    """
    return aggregate_run(
        {sid: r.replications for sid, r in multi.results.items()}
    )


__all__ = [
    "BottleneckEntry",
    "CrusherSummary",
    "DEFAULT_CONFIDENCE_LEVEL",
    "EdgeSummary",
    "LoaderSummary",
    "RunSummary",
    "ScenarioSummary",
    "StatSummary",
    "aggregate_run",
    "aggregate_scenario",
    "from_multi_scenario_result",
    "student_t_ci_95",
]